From 1504d8f65765d625b10e1e809f9b7548bea1ce12 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Mon, 17 Jul 2023 22:09:38 +0200 Subject: [PATCH] Change defaulting, add tests --- pkg/manager/internal.go | 30 ++-- pkg/manager/manager.go | 38 ++-- pkg/manager/manager_test.go | 346 +++++++++++++++++++++++++++++++++++- pkg/metrics/listener.go | 4 +- 4 files changed, 379 insertions(+), 39 deletions(-) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 13ca188ae0..09bcd1eaaa 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -36,7 +36,8 @@ import ( "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizerfactory" "k8s.io/apiserver/pkg/server/options" - "k8s.io/client-go/kubernetes" + authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1" + authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -92,15 +93,18 @@ type controllerManager struct { // metricsListener is used to serve prometheus metrics metricsListener net.Listener - // metricsInsecureServing enables insecure metrics serving. - // This means metrics will be served via http and without authentication and authorization. - metricsInsecureServing bool + // metricsSecureServing enables secure metrics serving. + // This means metrics will be served via https and with authentication and authorization. + metricsSecureServing bool // metricsExtraHandlers contains extra handlers to register on http server that serves metrics. metricsExtraHandlers map[string]http.Handler - // metricsKubeClient is the client used to authenticate and authorize requests to the metrics endpoint. - metricsKubeClient *kubernetes.Clientset + // metricsAuthenticationClient is the client used to authenticate requests to the metrics endpoint. + metricsAuthenticationClient authenticationv1.AuthenticationV1Interface + + // metricsAuthorizationClient is the client used to authorize requests to the metrics endpoint. + metricsAuthorizationClient authorizationv1.AuthorizationV1Interface // healthProbeListener is used to serve liveness probe healthProbeListener net.Listener @@ -318,9 +322,9 @@ func (cm *controllerManager) addMetricsServer() error { log := cm.logger.WithValues("path", defaultMetricsEndpoint) - if !cm.metricsInsecureServing { + if cm.metricsSecureServing { var err error - handler, err = withAuthenticationAndAuthorization(log, cm.metricsKubeClient, handler) + handler, err = withAuthenticationAndAuthorization(log, cm.metricsAuthenticationClient, cm.metricsAuthorizationClient, handler) if err != nil { return fmt.Errorf("failed to add metrics server: %w", err) } @@ -340,11 +344,11 @@ func (cm *controllerManager) addMetricsServer() error { }) } -func withAuthenticationAndAuthorization(log logr.Logger, metricsKubeClient *kubernetes.Clientset, handler http.Handler) (http.Handler, error) { +func withAuthenticationAndAuthorization(log logr.Logger, authenticationClient authenticationv1.AuthenticationV1Interface, authorizationClient authorizationv1.AuthorizationV1Interface, handler http.Handler) (http.Handler, error) { authenticatorConfig := authenticatorfactory.DelegatingAuthenticatorConfig{ Anonymous: false, // Require authentication. CacheTTL: 1 * time.Minute, - TokenAccessReviewClient: metricsKubeClient.AuthenticationV1(), + TokenAccessReviewClient: authenticationClient, TokenAccessReviewTimeout: 10 * time.Second, WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), } @@ -354,7 +358,7 @@ func withAuthenticationAndAuthorization(log logr.Logger, metricsKubeClient *kube } authorizerConfig := authorizerfactory.DelegatingAuthorizerConfig{ - SubjectAccessReviewClient: metricsKubeClient.AuthorizationV1(), + SubjectAccessReviewClient: authorizationClient, AllowCacheTTL: 5 * time.Minute, DenyCacheTTL: 30 * time.Second, WebhookRetryBackoff: options.DefaultAuthWebhookRetryBackoff(), @@ -392,13 +396,13 @@ func withAuthenticationAndAuthorization(log logr.Logger, metricsKubeClient *kube authorized, reason, err := delegatingAuthorizer.Authorize(ctx, attributes) if err != nil { - msg := fmt.Sprintf("Authorization for user %s failed", attributes.User) + msg := fmt.Sprintf("Authorization for user %s failed", attributes.User.GetName()) log.Error(err, fmt.Sprintf("%s: %s", msg, err)) http.Error(w, msg, http.StatusInternalServerError) return } if authorized != authorizer.DecisionAllow { - msg := fmt.Sprintf("Authorization denied for user %s", attributes.User) + msg := fmt.Sprintf("Authorization denied for user %s", attributes.User.GetName()) log.V(4).Info(fmt.Sprintf("%s: %s", msg, reason)) http.Error(w, msg, http.StatusForbidden) return diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index a6eec04f36..e75bdd07b3 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -30,6 +30,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1" + authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" @@ -260,7 +262,14 @@ type Options struct { // MetricsBindAddress is the TCP address that the controller should bind to // for serving prometheus metrics. // It can be set to "0" to disable the metrics serving. - // Per default metrics will be served via https and authenticated (via TokenReviews) + // + // Per default metrics will be served via http and without authentication and authorization. + // If MetricsSecureServing is enabled metrics will be served via https and authenticated (via TokenReviews) + // and authorized (via SubjectAccessReviews) with the kube-apiserver. + MetricsBindAddress string + + // MetricsSecureServing enables secure metrics serving. + // This means metrics will be served via https and authenticated (via TokenReviews) // and authorized (via SubjectAccessReviews) with the kube-apiserver. // // For the authentication and authorization the controller needs a ClusterRole @@ -283,13 +292,7 @@ type Options struct { // - "/metrics" // verbs: // - get - // Alternatively MetricsInsecureServing can be set to true, then metrics will - // be served via http and authentication and authorization is skipped. - MetricsBindAddress string - - // MetricsInsecureServing enables insecure metrics serving. - // This means metrics will be served via http and authentication and authorization is skipped. - MetricsInsecureServing bool + MetricsSecureServing bool // HealthProbeBindAddress is the TCP address that the controller should bind to // for serving health probes @@ -383,7 +386,7 @@ type Options struct { // Dependency injection for testing newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) - newMetricsListener func(addr string, insecureServing bool) (net.Listener, error) + newMetricsListener func(addr string, secureServing bool) (net.Listener, error) newHealthProbeListener func(addr string) (net.Listener, error) newPprofListener func(addr string) (net.Listener, error) } @@ -480,18 +483,20 @@ func New(config *rest.Config, options Options) (Manager, error) { } } - var metricsKubeClient *kubernetes.Clientset - if !options.MetricsInsecureServing { - var err error - metricsKubeClient, err = kubernetes.NewForConfigAndClient(config, cluster.GetHTTPClient()) + var metricsAuthenticationClient authenticationv1.AuthenticationV1Interface + var metricsAuthorizationClient authorizationv1.AuthorizationV1Interface + if options.MetricsSecureServing { + metricsKubeClient, err := kubernetes.NewForConfigAndClient(config, cluster.GetHTTPClient()) if err != nil { return nil, err } + metricsAuthenticationClient = metricsKubeClient.AuthenticationV1() + metricsAuthorizationClient = metricsKubeClient.AuthorizationV1() } // Create the metrics listener. This will throw an error if the metrics bind // address is invalid or already in use. - metricsListener, err := options.newMetricsListener(options.MetricsBindAddress, options.MetricsInsecureServing) + metricsListener, err := options.newMetricsListener(options.MetricsBindAddress, options.MetricsSecureServing) if err != nil { return nil, err } @@ -524,9 +529,10 @@ func New(config *rest.Config, options Options) (Manager, error) { recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, - metricsInsecureServing: options.MetricsInsecureServing, + metricsSecureServing: options.MetricsSecureServing, metricsExtraHandlers: metricsExtraHandlers, - metricsKubeClient: metricsKubeClient, + metricsAuthenticationClient: metricsAuthenticationClient, + metricsAuthorizationClient: metricsAuthorizationClient, controllerConfig: options.Controller, logger: options.Logger, elected: make(chan struct{}), diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 0186f46f8b..85507fc076 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -34,12 +34,15 @@ import ( . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" "go.uber.org/goleak" + authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/utils/pointer" configv1alpha1 "k8s.io/component-base/config/v1alpha1" @@ -569,9 +572,26 @@ var _ = Describe("manger.Manager", func() { var listener net.Listener m, err := New(cfg, Options{ MetricsBindAddress: ":0", - newMetricsListener: func(addr string) (net.Listener, error) { + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { var err error - listener, err = metrics.NewListener(addr) + listener, err = metrics.NewListener(addr, secureServing) + return listener, err + }, + }) + Expect(m).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + Expect(listener).ToNot(BeNil()) + Expect(listener.Close()).ToNot(HaveOccurred()) + }) + + It("should create a https listener for the metrics if a valid address is provided and secure serving enabled", func() { + var listener net.Listener + m, err := New(cfg, Options{ + MetricsBindAddress: ":0", + MetricsSecureServing: true, + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { + var err error + listener, err = metrics.NewListener(addr, secureServing) return listener, err }, }) @@ -582,15 +602,36 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error if the metrics bind address is already in use", func() { - ln, err := metrics.NewListener(":0") + ln, err := metrics.NewListener(":0", false) Expect(err).ShouldNot(HaveOccurred()) var listener net.Listener m, err := New(cfg, Options{ MetricsBindAddress: ln.Addr().String(), - newMetricsListener: func(addr string) (net.Listener, error) { + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { var err error - listener, err = metrics.NewListener(addr) + listener, err = metrics.NewListener(addr, secureServing) + return listener, err + }, + }) + Expect(m).To(BeNil()) + Expect(err).To(HaveOccurred()) + Expect(listener).To(BeNil()) + + Expect(ln.Close()).ToNot(HaveOccurred()) + }) + + It("should return an error if the metrics bind address is already in use and secure serving enabled", func() { + ln, err := metrics.NewListener(":0", true) + Expect(err).ShouldNot(HaveOccurred()) + + var listener net.Listener + m, err := New(cfg, Options{ + MetricsBindAddress: ln.Addr().String(), + MetricsSecureServing: true, + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { + var err error + listener, err = metrics.NewListener(addr, secureServing) return listener, err }, }) @@ -1176,9 +1217,9 @@ var _ = Describe("manger.Manager", func() { BeforeEach(func() { listener = nil opts = Options{ - newMetricsListener: func(addr string) (net.Listener, error) { + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { var err error - listener, err = metrics.NewListener(addr) + listener, err = metrics.NewListener(addr, secureServing) return listener, err }, } @@ -1202,7 +1243,7 @@ var _ = Describe("manger.Manager", func() { }() // Check the metrics started - endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) + endpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) _, err = http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) @@ -1330,6 +1371,222 @@ var _ = Describe("manger.Manager", func() { Expect(string(body)).To(Equal("Some debug info")) }) }) + + Context("should start serving metrics via https if secure serving is enabled", func() { + var listener net.Listener + var opts Options + var httpClient *http.Client + + BeforeEach(func() { + listener = nil + opts = Options{ + MetricsSecureServing: true, + newMetricsListener: func(addr string, secureServing bool) (net.Listener, error) { + var err error + listener, err = metrics.NewListener(addr, secureServing) + return listener, err + }, + } + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, //nolint:gosec + } + httpClient = &http.Client{Transport: tr} + }) + + AfterEach(func() { + if listener != nil { + listener.Close() + } + }) + + It("should stop serving metrics when stop is called", func() { + opts.MetricsBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + + // Check the metrics started + endpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String()) + resp, err := httpClient.Get(endpoint) + Expect(err).NotTo(HaveOccurred()) + body, err := io.ReadAll(resp.Body) + Expect(resp.StatusCode).To(Equal(401)) + Expect(err).NotTo(HaveOccurred()) + // Unauthorized is the expected response if no bearer token is provided. + Expect(string(body)).To(ContainSubstring("Unauthorized")) + + // Shutdown the server + cancel() + + // Expect the metrics server to shutdown + Eventually(func() error { + _, err = http.Get(endpoint) + return err + }).ShouldNot(Succeed()) + }) + + It("should serve metrics endpoint", func() { + opts.MetricsBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + // Setup service account with rights to "get" "/metrics" + token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics") + defer cleanup() + Expect(err).ToNot(HaveOccurred()) + + metricsEndpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String()) + req, err := http.NewRequest("GET", metricsEndpoint, nil) + Expect(err).NotTo(HaveOccurred()) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err := httpClient.Do(req) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(200)) + body, err := io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(body)).To(ContainSubstring("rest_client_requests_total")) + }) + + It("should not serve anything other than metrics endpoint by default", func() { + opts.MetricsBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + endpoint := fmt.Sprintf("https://%s/should-not-exist", listener.Addr().String()) + resp, err := httpClient.Get(endpoint) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(404)) + }) + + It("should serve metrics in its registry", func() { + one := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "test_one", + Help: "test metric for testing", + }) + one.Inc() + err := metrics.Registry.Register(one) + Expect(err).NotTo(HaveOccurred()) + + opts.MetricsBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + // Setup service account with rights to "get" "/metrics" + token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/metrics") + defer cleanup() + Expect(err).ToNot(HaveOccurred()) + + metricsEndpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String()) + req, err := http.NewRequest("GET", metricsEndpoint, nil) + Expect(err).NotTo(HaveOccurred()) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err := httpClient.Do(req) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(200)) + data, err := io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(data)).To(ContainSubstring("%s\n%s\n%s\n", + `# HELP test_one test metric for testing`, + `# TYPE test_one counter`, + `test_one 1`, + )) + + // Unregister will return false if the metric was never registered + ok := metrics.Registry.Unregister(one) + Expect(ok).To(BeTrue()) + }) + + It("should serve extra endpoints", func() { + opts.MetricsBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + err = m.AddMetricsExtraHandler("/debug", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + _, _ = w.Write([]byte("Some debug info")) + })) + Expect(err).NotTo(HaveOccurred()) + + // Should error when we add another extra endpoint on the already registered path. + err = m.AddMetricsExtraHandler("/debug", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + _, _ = w.Write([]byte("Another debug info")) + })) + Expect(err).To(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + // Setup service account with rights to "get" "/debug" + token, cleanup, err := setupServiceAccountForURL(ctx, m.GetClient(), "/debug") + defer cleanup() + Expect(err).ToNot(HaveOccurred()) + + endpoint := fmt.Sprintf("https://%s/debug", listener.Addr().String()) + req, err := http.NewRequest("GET", endpoint, nil) + Expect(err).NotTo(HaveOccurred()) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err := httpClient.Do(req) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(200)) + body, err := io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(body)).To(Equal("Some debug info")) + + metricsEndpoint := fmt.Sprintf("https://%s/metrics", listener.Addr().String()) + req, err = http.NewRequest("GET", metricsEndpoint, nil) + Expect(err).NotTo(HaveOccurred()) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token)) + + resp, err = httpClient.Do(req) + Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() + Expect(resp.StatusCode).To(Equal(403)) + body, err = io.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + // Unauthorized is expected as the token only has rights for /debug not for /metrics. + Expect(string(body)).To(ContainSubstring("Authorization denied for user system:serviceaccount:default:metrics-test")) + }) + }) }) Context("should start serving health probes", func() { @@ -1783,6 +2040,79 @@ var _ = Describe("manger.Manager", func() { }) }) +func setupServiceAccountForURL(ctx context.Context, c client.Client, path string) (string, func(), error) { + createdObjects := []client.Object{} + cleanup := func() { + for _, obj := range createdObjects { + _ = c.Delete(ctx, obj) + } + } + + sa := &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metrics-test", + Namespace: metav1.NamespaceDefault, + }, + } + if err := c.Create(ctx, sa); err != nil { + return "", cleanup, err + } + createdObjects = append(createdObjects, sa) + + cr := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metrics-test", + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"get"}, + NonResourceURLs: []string{path}, + }, + }, + } + if err := c.Create(ctx, cr); err != nil { + return "", cleanup, err + } + createdObjects = append(createdObjects, cr) + + crb := &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "metrics-test", + }, + Subjects: []rbacv1.Subject{ + { + Kind: rbacv1.ServiceAccountKind, + Name: "metrics-test", + Namespace: metav1.NamespaceDefault, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "ClusterRole", + Name: "metrics-test", + }, + } + if err := c.Create(ctx, crb); err != nil { + return "", cleanup, err + } + createdObjects = append(createdObjects, crb) + + tokenRequest := &authenticationv1.TokenRequest{ + Spec: authenticationv1.TokenRequestSpec{ + ExpirationSeconds: pointer.Int64(2 * 60 * 60), // 2 hours. + }, + } + if err := c.SubResource("token").Create(ctx, sa, tokenRequest); err != nil { + return "", cleanup, err + } + + if tokenRequest.Status.Token == "" { + return "", cleanup, errors.New("failed to get ServiceAccount token: token should not be empty") + } + + return tokenRequest.Status.Token, cleanup, nil +} + type runnableError struct { } diff --git a/pkg/metrics/listener.go b/pkg/metrics/listener.go index 3bf40ff7c4..cf86af6368 100644 --- a/pkg/metrics/listener.go +++ b/pkg/metrics/listener.go @@ -33,7 +33,7 @@ var log = logf.RuntimeLog.WithName("metrics") var DefaultBindAddress = ":8080" // NewListener creates a new TCP listener bound to the given address. -func NewListener(addr string, insecureServing bool) (net.Listener, error) { +func NewListener(addr string, secureServing bool) (net.Listener, error) { if addr == "" { // If the metrics bind address is empty, default to ":8080" addr = DefaultBindAddress @@ -44,7 +44,7 @@ func NewListener(addr string, insecureServing bool) (net.Listener, error) { return nil, nil } - if insecureServing { + if !secureServing { log.Info("Metrics server is starting to listen", "addr", addr) ln, err := net.Listen("tcp", addr) if err != nil {