diff --git a/conf/proxy.conf b/conf/proxy.conf index 9c63d3eda7adc..1902247e551f4 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -335,3 +335,8 @@ zookeeperSessionTimeoutMs=-1 # ZooKeeper cache expiry time in seconds # Deprecated: use metadataStoreCacheExpirySeconds zooKeeperCacheExpirySeconds=-1 + +### --- Metrics --- ### + +# Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints +enableProxyStatsEndpoints=true diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index c28c7e635f0fb..b13d3e8b9e97a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -342,6 +342,12 @@ public class ProxyConfiguration implements PulsarConfiguration { + "to take effect" ) private boolean forwardAuthorizationCredentials = false; + @FieldContext( + category = CATEGORY_HTTP, + doc = "Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints" + ) + private boolean enableProxyStatsEndpoints = true; + @FieldContext( category = CATEGORY_AUTHENTICATION, doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true." diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index d042323c74eb7..6bbe6691be3c8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -241,17 +241,19 @@ public static void addWebServerHandlers(WebServer server, ProxyConfiguration config, ProxyService service, BrokerDiscoveryProvider discoveryProvider) throws Exception { - if (service != null) { - PrometheusMetricsServlet metricsServlet = service.getMetricsServlet(); - if (metricsServlet != null) { - server.addServlet("/metrics", new ServletHolder(metricsServlet), - Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + if (config.isEnableProxyStatsEndpoints()) { + server.addRestResources("/", VipStatus.class.getPackage().getName(), + VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); + server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), + ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service); + if (service != null) { + PrometheusMetricsServlet metricsServlet = service.getMetricsServlet(); + if (metricsServlet != null) { + server.addServlet("/metrics", new ServletHolder(metricsServlet), + Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + } } } - server.addRestResources("/", VipStatus.class.getPackage().getName(), - VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath()); - server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(), - ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service); AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 35f717361f238..4269ac3d966fe 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -151,10 +151,18 @@ public void addServlet(String basePath, ServletHolder servletHolder, List> attributes, boolean requireAuthentication) { - Optional existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst(); - if (existingPath.isPresent()) { - throw new IllegalArgumentException( - String.format("Cannot add servlet at %s, path %s already exists", basePath, existingPath.get())); + addServlet(basePath, servletHolder, attributes, requireAuthentication, true); + } + + private void addServlet(String basePath, ServletHolder servletHolder, + List> attributes, boolean requireAuthentication, boolean checkForExistingPaths) { + if (checkForExistingPaths) { + Optional existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst(); + if (existingPath.isPresent()) { + throw new IllegalArgumentException( + String.format("Cannot add servlet at %s, path %s already exists", basePath, + existingPath.get())); + } } servletPaths.add(basePath); @@ -183,11 +191,9 @@ public void addRestResources(String basePath, String javaPackages, String attrib config.register(JsonMapperProvider.class); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(basePath); - context.addServlet(servletHolder, "/*"); - context.setAttribute(attribute, attributeValue); - handlers.add(context); + // This method has not historically checked for existing paths, so we don't check here either. The + // method call is added to reduce code duplication. + addServlet(basePath, servletHolder, Collections.singletonList(Pair.of(attribute, attributeValue)), true, false); } public int getExternalServicePort() { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java index d0213899046c8..4a2bd92842f37 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.proxy.stats; +import static java.util.concurrent.TimeUnit.SECONDS; import io.netty.channel.Channel; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -27,7 +28,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -36,7 +40,12 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationParameters; +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.proxy.server.ProxyService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @@ -45,12 +54,15 @@ @Produces(MediaType.APPLICATION_JSON) public class ProxyStats { + private static final Logger log = LoggerFactory.getLogger(ProxyStats.class); public static final String ATTRIBUTE_PULSAR_PROXY_NAME = "pulsar-proxy"; private ProxyService service; @Context protected ServletContext servletContext; + @Context + protected HttpServletRequest httpRequest; @GET @Path("/connections") @@ -58,6 +70,7 @@ public class ProxyStats { response = List.class, responseContainer = "List") @ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") }) public List metrics() { + throwIfNotSuperUser("metrics"); List stats = new ArrayList<>(); proxyService().getClientCnxs().forEach(cnx -> { if (cnx.getDirectProxyHandler() == null) { @@ -78,7 +91,7 @@ public List metrics() { @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy logging should be > 2 to capture topic stats"), @ApiResponse(code = 503, message = "Proxy service is not initialized") }) public Map topics() { - + throwIfNotSuperUser("topics"); Optional logLevel = proxyService().getConfiguration().getProxyLogLevel(); if (!logLevel.isPresent() || logLevel.get() < 2) { throw new RestException(Status.PRECONDITION_FAILED, "Proxy doesn't have logging level 2"); @@ -92,6 +105,7 @@ public Map topics() { notes = "It only changes log-level in memory, change it config file to persist the change") @ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), }) public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) { + throwIfNotSuperUser("updateProxyLogLevel"); if (logLevel < 0 || logLevel > 2) { throw new RestException(Status.PRECONDITION_FAILED, "Proxy log level can be only [0-2]"); } @@ -102,6 +116,7 @@ public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) { @Path("/logging") @ApiOperation(hidden = true, value = "Get proxy logging") public int getProxyLogLevel(@PathParam("logLevel") int logLevel) { + throwIfNotSuperUser("getProxyLogLevel"); return proxyService().getProxyLogLevel(); } @@ -114,4 +129,26 @@ protected ProxyService proxyService() { } return service; } + + private void throwIfNotSuperUser(String action) { + if (proxyService().getConfiguration().isAuthorizationEnabled()) { + AuthenticationParameters authParams = AuthenticationParameters.builder() + .clientRole((String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)) + .clientAuthenticationDataSource((AuthenticationDataSource) + httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName)) + .build(); + try { + if (authParams.getClientRole() == null + || !proxyService().getAuthorizationService().isSuperUser(authParams).get(30, SECONDS)) { + log.error("Client with role [{}] is not authorized to {}", authParams.getClientRole(), action); + throw new org.apache.pulsar.common.util.RestException(Status.UNAUTHORIZED, + "Client is not authorized to perform operation"); + } + } catch (ExecutionException | TimeoutException | InterruptedException e) { + log.warn("Time-out {} sec while checking the role {} is a super user role ", 30, + authParams.getClientRole()); + throw new org.apache.pulsar.common.util.RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + } }