From 4b2e71f301d233e0e20a7eb64c6c8be7ce1d0f99 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Sat, 22 Jul 2023 15:57:04 +0200 Subject: [PATCH] move filter to separate package --- pkg/manager/internal.go | 102 +++++------------------------ pkg/manager/manager.go | 52 ++++----------- pkg/manager/manager_test.go | 32 +++++++-- pkg/metrics/filters.go | 31 +++++++++ pkg/metrics/filters/filters.go | 116 +++++++++++++++++++++++++++++++++ pkg/metrics/listener.go | 4 ++ 6 files changed, 204 insertions(+), 133 deletions(-) create mode 100644 pkg/metrics/filters.go create mode 100644 pkg/metrics/filters/filters.go diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index fad1521a54..e830f9bdcb 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -32,12 +32,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apiserver/pkg/authentication/authenticatorfactory" - "k8s.io/apiserver/pkg/authorization/authorizer" - "k8s.io/apiserver/pkg/authorization/authorizerfactory" - "k8s.io/apiserver/pkg/server/options" - authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1" - authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -93,19 +87,14 @@ type controllerManager struct { // metricsListener is used to serve prometheus metrics metricsListener net.Listener - // metricsSecureServing enables secure metrics serving. - // This means metrics will be served via https and with authentication and authorization. - metricsSecureServing bool + // metricsFilter is a func that is added before the metrics handler on the metrics server. + // This can be e.g. used to enforce authentication and authorization on the metrics + // endpoint. + metricsFilter metrics.Filter // metricsExtraHandlers contains extra handlers to register on http server that serves metrics. metricsExtraHandlers map[string]http.Handler - // metricsAuthenticationClient is the client used to authenticate requests to the metrics endpoint. - metricsAuthenticationClient authenticationv1.AuthenticationV1Interface - - // metricsAuthorizationClient is the client used to authorize requests to the metrics endpoint. - metricsAuthorizationClient authorizationv1.AuthorizationV1Interface - // healthProbeListener is used to serve liveness probe healthProbeListener net.Listener @@ -322,17 +311,24 @@ func (cm *controllerManager) addMetricsServer() error { log := cm.logger.WithValues("path", defaultMetricsEndpoint) - if cm.metricsSecureServing { + if cm.metricsFilter != nil { var err error - handler, err = withAuthenticationAndAuthorization(log, cm.metricsAuthenticationClient, cm.metricsAuthorizationClient, handler) + handler, err = cm.metricsFilter(log, handler) if err != nil { - return fmt.Errorf("failed to add metrics server: %w", err) + return fmt.Errorf("failed to add metrics server: failed to add metrics filter %w", err) } } - // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux.Handle(defaultMetricsEndpoint, handler) + for path, extraHandler := range cm.metricsExtraHandlers { + if cm.metricsFilter != nil { + var err error + extraHandler, err = cm.metricsFilter(log, extraHandler) + if err != nil { + return fmt.Errorf("failed to add metrics server: failed to add metrics filter to extra handler %w", err) + } + } mux.Handle(path, extraHandler) } @@ -344,74 +340,6 @@ func (cm *controllerManager) addMetricsServer() error { }) } -func withAuthenticationAndAuthorization(log logr.Logger, authenticationClient authenticationv1.AuthenticationV1Interface, authorizationClient authorizationv1.AuthorizationV1Interface, handler http.Handler) (http.Handler, error) { - authenticatorConfig := authenticatorfactory.DelegatingAuthenticatorConfig{ - Anonymous: false, // Require authentication. - CacheTTL: 1 * time.Minute, - TokenAccessReviewClient: authenticationClient, - TokenAccessReviewTimeout: 10 * time.Second, - WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), - } - delegatingAuthenticator, _, err := authenticatorConfig.New() - if err != nil { - return nil, fmt.Errorf("failed to create authenticator: %w", err) - } - - authorizerConfig := authorizerfactory.DelegatingAuthorizerConfig{ - SubjectAccessReviewClient: authorizationClient, - AllowCacheTTL: 5 * time.Minute, - DenyCacheTTL: 30 * time.Second, - WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), - } - delegatingAuthorizer, err := authorizerConfig.New() - if err != nil { - return nil, fmt.Errorf("failed to create authorizer: %w", err) - } - - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if req.Method != http.MethodGet { - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) - return - } - - ctx := req.Context() - - res, ok, err := delegatingAuthenticator.AuthenticateRequest(req) - if err != nil { - log.Error(err, "Authentication failed", err) - http.Error(w, "Authentication failed", http.StatusInternalServerError) - return - } - if !ok { - log.V(4).Info("Authentication failed") - http.Error(w, "Unauthorized", http.StatusUnauthorized) - return - } - - attributes := authorizer.AttributesRecord{ - User: res.User, - Verb: "get", - Path: req.URL.Path, - } - - authorized, reason, err := delegatingAuthorizer.Authorize(ctx, attributes) - if err != nil { - msg := fmt.Sprintf("Authorization for user %s failed", attributes.User.GetName()) - log.Error(err, msg) - http.Error(w, msg, http.StatusInternalServerError) - return - } - if authorized != authorizer.DecisionAllow { - msg := fmt.Sprintf("Authorization denied for user %s", attributes.User.GetName()) - log.V(4).Info(fmt.Sprintf("%s: %s", msg, reason)) - http.Error(w, msg, http.StatusForbidden) - return - } - - handler.ServeHTTP(w, req) - }), nil -} - func (cm *controllerManager) serveHealthProbes() { mux := http.NewServeMux() server := httpserver.New(mux) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e75bdd07b3..e8a67ffdc7 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -29,9 +29,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1" - authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -263,37 +260,19 @@ type Options struct { // for serving prometheus metrics. // It can be set to "0" to disable the metrics serving. // - // Per default metrics will be served via http and without authentication and authorization. - // If MetricsSecureServing is enabled metrics will be served via https and authenticated (via TokenReviews) - // and authorized (via SubjectAccessReviews) with the kube-apiserver. + // Per default metrics will be served via http. If MetricsSecureServing is enabled + // metrics will be served via https. MetricsBindAddress string - // MetricsSecureServing enables secure metrics serving. - // This means metrics will be served via https and authenticated (via TokenReviews) - // and authorized (via SubjectAccessReviews) with the kube-apiserver. - // - // For the authentication and authorization the controller needs a ClusterRole - // with the following rules: - // - apiGroups: - // - authentication.k8s.io - // resources: - // - tokenreviews - // verbs: - // - create - // - apiGroups: - // - authorization.k8s.io - // resources: - // - subjectaccessreviews - // verbs: - // - create - // To scrape metrics e.g. via Prometheus the client needs a ClusterRole - // with the following rule: - // - nonResourceURLs: - // - "/metrics" - // verbs: - // - get + // MetricsSecureServing enables serving metrics via https. MetricsSecureServing bool + // MetricsFilterProvider provides a metrics filter which is a func that is added before + // the metrics handler on the metrics server. + // This can be e.g. used to enforce authentication and authorization on the metrics + // endpoint by setting this field to filters.WithAuthenticationAndAuthorization. + MetricsFilterProvider func(c *rest.Config, httpClient *http.Client) (metrics.Filter, error) + // HealthProbeBindAddress is the TCP address that the controller should bind to // for serving health probes // It can be set to "0" or "" to disable serving the health probe. @@ -483,15 +462,12 @@ func New(config *rest.Config, options Options) (Manager, error) { } } - var metricsAuthenticationClient authenticationv1.AuthenticationV1Interface - var metricsAuthorizationClient authorizationv1.AuthorizationV1Interface - if options.MetricsSecureServing { - metricsKubeClient, err := kubernetes.NewForConfigAndClient(config, cluster.GetHTTPClient()) + var metricsFilter metrics.Filter + if options.MetricsFilterProvider != nil { + metricsFilter, err = options.MetricsFilterProvider(config, cluster.GetHTTPClient()) if err != nil { return nil, err } - metricsAuthenticationClient = metricsKubeClient.AuthenticationV1() - metricsAuthorizationClient = metricsKubeClient.AuthorizationV1() } // Create the metrics listener. This will throw an error if the metrics bind @@ -529,10 +505,8 @@ func New(config *rest.Config, options Options) (Manager, error) { recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, - metricsSecureServing: options.MetricsSecureServing, + metricsFilter: metricsFilter, metricsExtraHandlers: metricsExtraHandlers, - metricsAuthenticationClient: metricsAuthenticationClient, - metricsAuthorizationClient: metricsAuthorizationClient, controllerConfig: options.Controller, logger: options.Logger, elected: make(chan struct{}), diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index b853470c4e..f222e6aaa8 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -55,6 +55,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/leaderelection" fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake" "sigs.k8s.io/controller-runtime/pkg/metrics" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" "sigs.k8s.io/controller-runtime/pkg/recorder" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -1380,7 +1381,8 @@ var _ = Describe("manger.Manager", func() { BeforeEach(func() { listener = nil opts = Options{ - MetricsSecureServing: true, + MetricsSecureServing: true, + MetricsFilterProvider: filters.WithAuthenticationAndAuthorization, newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { var err error listener, err = metrics.NewListener(addr, secureServing) @@ -1443,7 +1445,7 @@ var _ = Describe("manger.Manager", func() { }() <-m.Elected() - // Setup service account with rights to "get" "/metrics" + // Setup service account with rights to "/metrics" token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics") defer cleanup() Expect(err).ToNot(HaveOccurred()) @@ -1456,6 +1458,7 @@ var _ = Describe("manger.Manager", func() { resp, err := httpClient.Do(req) Expect(err).NotTo(HaveOccurred()) defer resp.Body.Close() + // This is expected as the token has rights for /metrics. Expect(resp.StatusCode).To(Equal(200)) body, err := io.ReadAll(resp.Body) Expect(err).NotTo(HaveOccurred()) @@ -1503,7 +1506,7 @@ var _ = Describe("manger.Manager", func() { }() <-m.Elected() - // Setup service account with rights to "get" "/metrics" + // Setup service account with rights to "/metrics" token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics") defer cleanup() Expect(err).ToNot(HaveOccurred()) @@ -1516,6 +1519,7 @@ var _ = Describe("manger.Manager", func() { resp, err := httpClient.Do(req) Expect(err).NotTo(HaveOccurred()) defer resp.Body.Close() + // This is expected as the token has rights for /metrics. Expect(resp.StatusCode).To(Equal(200)) data, err := io.ReadAll(resp.Body) Expect(err).NotTo(HaveOccurred()) @@ -1554,7 +1558,7 @@ var _ = Describe("manger.Manager", func() { }() <-m.Elected() - // Setup service account with rights to "get" "/debug" + // Setup service account with rights to "/debug" token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/debug") defer cleanup() Expect(err).ToNot(HaveOccurred()) @@ -1562,14 +1566,28 @@ var _ = Describe("manger.Manager", func() { endpoint := fmt.Sprintf("https://%s/debug", listener.Addr().String()) req, err := http.NewRequest("GET", endpoint, nil) Expect(err).NotTo(HaveOccurred()) - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) resp, err := httpClient.Do(req) Expect(err).NotTo(HaveOccurred()) defer resp.Body.Close() - Expect(resp.StatusCode).To(Equal(200)) + // This is expected as we didn't send a token. + Expect(resp.StatusCode).To(Equal(401)) body, err := io.ReadAll(resp.Body) Expect(err).NotTo(HaveOccurred()) + // Unauthorized is expected as we didn't send a token. + Expect(string(body)).To(ContainSubstring("Unauthorized")) + + req, err = http.NewRequest("PUT", endpoint, nil) + Expect(err).NotTo(HaveOccurred()) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err = httpClient.Do(req) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + // This is expected as the token has rights for /debug. + Expect(resp.StatusCode).To(Equal(200)) + body, err = io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) Expect(string(body)).To(Equal("Some debug info")) metricsEndpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String()) @@ -2065,7 +2083,7 @@ func setupServiceAccountForURL(ctx context.Context, c client.Client, path string }, Rules: []rbacv1.PolicyRule{ { - Verbs: []string{"get"}, + Verbs: []string{"get", "put"}, NonResourceURLs: []string{path}, }, }, diff --git a/pkg/metrics/filters.go b/pkg/metrics/filters.go new file mode 100644 index 0000000000..0aa6967d17 --- /dev/null +++ b/pkg/metrics/filters.go @@ -0,0 +1,31 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +Package metrics contains controller related metrics utilities +*/ +package metrics + +import ( + "net/http" + + "github.com/go-logr/logr" +) + +// Filter is a func that is added before the metrics handler on the metrics server. +// This can be e.g. used to enforce authentication and authorization on the metrics +// endpoint by setting this field to filters.WithAuthenticationAndAuthorization. +type Filter func(log logr.Logger, handler http.Handler) (http.Handler, error) diff --git a/pkg/metrics/filters/filters.go b/pkg/metrics/filters/filters.go new file mode 100644 index 0000000000..1ea437a66e --- /dev/null +++ b/pkg/metrics/filters/filters.go @@ -0,0 +1,116 @@ +package filters + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/go-logr/logr" + "k8s.io/apiserver/pkg/authentication/authenticatorfactory" + "k8s.io/apiserver/pkg/authorization/authorizer" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/apiserver/pkg/server/options" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +// WithAuthenticationAndAuthorization provides a metrics.Filter for authentication and authorization. +// Metrics will be authenticated (via TokenReviews) and authorized (via SubjectAccessReviews) with the +// kube-apiserver. +// For the authentication and authorization the controller needs a ClusterRole +// with the following rules: +// - apiGroups: +// - authentication.k8s.io +// resources: +// - tokenreviews +// verbs: +// - create +// +// - apiGroups: +// - authorization.k8s.io +// resources: +// - subjectaccessreviews +// verbs: +// - create +// +// To scrape metrics e.g. via Prometheus the client needs a ClusterRole +// with the following rule: +// - nonResourceURLs: +// - "/metrics" +// verbs: +// - get +// +// Note: Please note that configuring this metrics provider will introduce a dependency to "k8s.io/apiserver" +// to your go module. +func WithAuthenticationAndAuthorization(config *rest.Config, httpClient *http.Client) (metrics.Filter, error) { + kubeClient, err := kubernetes.NewForConfigAndClient(config, httpClient) + if err != nil { + return nil, err + } + + authenticatorConfig := authenticatorfactory.DelegatingAuthenticatorConfig{ + Anonymous: false, // Require authentication. + CacheTTL: 1 * time.Minute, + TokenAccessReviewClient: kubeClient.AuthenticationV1(), + TokenAccessReviewTimeout: 10 * time.Second, + WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), + } + delegatingAuthenticator, _, err := authenticatorConfig.New() + if err != nil { + return nil, fmt.Errorf("failed to create authenticator: %w", err) + } + + authorizerConfig := authorizerfactory.DelegatingAuthorizerConfig{ + SubjectAccessReviewClient: kubeClient.AuthorizationV1(), + AllowCacheTTL: 5 * time.Minute, + DenyCacheTTL: 30 * time.Second, + WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), + } + delegatingAuthorizer, err := authorizerConfig.New() + if err != nil { + return nil, fmt.Errorf("failed to create authorizer: %w", err) + } + + return func(log logr.Logger, handler http.Handler) (http.Handler, error) { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + + res, ok, err := delegatingAuthenticator.AuthenticateRequest(req) + if err != nil { + log.Error(err, "Authentication failed", err) + http.Error(w, "Authentication failed", http.StatusInternalServerError) + return + } + if !ok { + log.V(4).Info("Authentication failed") + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + attributes := authorizer.AttributesRecord{ + User: res.User, + Verb: strings.ToLower(req.Method), + Path: req.URL.Path, + } + + authorized, reason, err := delegatingAuthorizer.Authorize(ctx, attributes) + if err != nil { + msg := fmt.Sprintf("Authorization for user %s failed", attributes.User.GetName()) + log.Error(err, msg) + http.Error(w, msg, http.StatusInternalServerError) + return + } + if authorized != authorizer.DecisionAllow { + msg := fmt.Sprintf("Authorization denied for user %s", attributes.User.GetName()) + log.V(4).Info(fmt.Sprintf("%s: %s", msg, reason)) + http.Error(w, msg, http.StatusForbidden) + return + } + + handler.ServeHTTP(w, req) + }), nil + }, nil +} diff --git a/pkg/metrics/listener.go b/pkg/metrics/listener.go index cf86af6368..aac8dda89c 100644 --- a/pkg/metrics/listener.go +++ b/pkg/metrics/listener.go @@ -55,6 +55,10 @@ func NewListener(addr string, secureServing bool) (net.Listener, error) { return ln, nil } + // Note: Using self-signed certificates here should be good enough as usually nobody goes through + // the trouble to verify the server certificate of a metrics endpoint. It's just important that we + // encrypt the communication. + // For example kube-controller-manager also uses a self-signed certificate for the metrics endpoint per default. cert, key, err := certutil.GenerateSelfSignedCertKeyWithFixtures("localhost", []net.IP{{127, 0, 0, 1}}, nil, "") if err != nil { return nil, fmt.Errorf("failed to generate self-signed certificate for metrics server: %w", err)