From 7fe7cea74d5e0a6ce1b982ba3cb9b4808db7b5ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Fri, 5 May 2023 16:11:28 +0200 Subject: [PATCH] move the logic in the authz service --- .../pulsar/broker/ServiceConfiguration.java | 7 +++++ .../authorization/AuthorizationService.java | 31 +++++++++++++++++++ .../FixedRolesBasedAuthorizationFilter.java | 13 ++------ .../functions/worker/rest/api/WorkerImpl.java | 24 ++++++++++++-- 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0dbc25f7662c87..0a1f49fde95621 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2826,6 +2826,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, + "'authenticationEnabled' must also be set for this to take effect." ) private boolean authenticateMetricsEndpoint = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Whether the '/metrics' endpoint requires authorization. Defaults to true." + + "'authenticateMetricsEndpoint' must also be set for this to take effect." + + "The super user roles and metrics roles will be allowed for the metrics endpoints." + ) + private boolean authorizeMetricsEndpoint = true; @FieldContext( category = CATEGORY_METRICS, doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics" diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 0c61219b57a50c..1d90e2e4f2b971 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -793,4 +793,35 @@ public Boolean allowTopicOperation(TopicName topicName, throw new RestException(e.getCause()); } } + + public CompletableFuture allowToScrapeMetrics(AuthenticationParameters authParams) { + if (!isValidOriginalPrincipal(authParams)) { + return CompletableFuture.completedFuture(false); + } + if (isMetricsRole(authParams.getOriginalPrincipal())) { + return CompletableFuture.completedFuture(true); + } else { + return isSuperUser(authParams); + } + } + + public CompletableFuture allowToScrapeMetrics(String user, + AuthenticationDataSource authenticationData) { + if (isMetricsRole(user)) { + return CompletableFuture.completedFuture(true); + } else { + return isSuperUser(user, authenticationData); + } + } + + private boolean isMetricsRole(String user) { + if (!this.conf.isAuthenticateMetricsEndpoint() || this.conf.isAuthorizeMetricsEndpoint()) { + return true; + } + final Set metricsRoles = conf.getMetricsRoles(); + if (metricsRoles != null && metricsRoles.contains(user)) { + return true; + } + return false; + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/FixedRolesBasedAuthorizationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/FixedRolesBasedAuthorizationFilter.java index 5c2fb949d4fd50..b90ae701cc5b91 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/FixedRolesBasedAuthorizationFilter.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/FixedRolesBasedAuthorizationFilter.java @@ -62,7 +62,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha final String role = getAuthenticatedRole(httpRequest); final AuthenticationDataSource authenticatedDataSource = getAuthenticatedDataSource(httpRequest); - boolean authorized = isAuthorized(role, authenticatedDataSource); + boolean authorized = authorizationService.allowToScrapeMetrics(role, authenticatedDataSource) + .join(); if (authorized) { chain.doFilter(request, response); } else { @@ -80,16 +81,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } } - private boolean isAuthorized(String role, AuthenticationDataSource authenticatedDataSource) { - if (StringUtils.isBlank(role)) { - return false; - } - if (roles != null && roles.contains(role)) { - return true; - } - return authorizationService.isSuperUser(role, authenticatedDataSource).join(); - } - @Override public void init(FilterConfig arg) throws ServletException { // No init necessary. diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index c3755f627525e8..346687426cedb4 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -128,6 +128,26 @@ public Map> getAssignments(AuthenticationParameters a return ret; } + private void throwIfNotAuthorizedToScrapeMetrics(AuthenticationParameters authParams, String action) { + if (worker().getWorkerConfig().isAuthorizationEnabled()) { + return; + } + + try { + if (authParams.getClientRole() == null || !worker().getAuthorizationService().allowToScrapeMetrics(authParams) + .get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS)) { + log.error("Client with role [{}] and originalPrincipal [{}] is not authorized to {}", + authParams.getClientRole(), authParams.getOriginalPrincipal(), action); + throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation"); + } + } catch (ExecutionException | TimeoutException | InterruptedException e) { + log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a allowed to scrape metrics", + worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), + authParams.getClientRole(), authParams.getOriginalPrincipal()); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + private void throwIfNotSuperUser(AuthenticationParameters authParams, String action) { if (worker().getWorkerConfig().isAuthorizationEnabled()) { try { @@ -151,7 +171,7 @@ public List getWorkerMetrics(final Authe if (!isWorkerServiceAvailable() || worker().getMetricsGenerator() == null) { throwUnavailableException(); } - throwIfNotSuperUser(authParams, "get worker stats"); + throwIfNotAuthorizedToScrapeMetrics(authParams, "get worker stats"); return worker().getMetricsGenerator().generate(); } @@ -162,7 +182,7 @@ public List getFunctionsMetrics(AuthenticationParam throwUnavailableException(); } - throwIfNotSuperUser(authParams, "get function stats"); + throwIfNotAuthorizedToScrapeMetrics(authParams, "get function stats"); Map functionRuntimes = worker().getFunctionRuntimeManager() .getFunctionRuntimeInfos();