Skip to content

Commit

Permalink
[improve][proxy] Support disabling metrics endpoint (#21031)
Browse files Browse the repository at this point in the history
(cherry picked from commit d06cda6)
  • Loading branch information
michaeljmarshall committed Aug 24, 2023
1 parent e103822 commit c644849
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 17 deletions.
2 changes: 2 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ zooKeeperCacheExpirySeconds=-1

### --- Metrics --- ###

# Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints
enableProxyStatsEndpoints=true
# Whether the '/metrics' endpoint requires authentication. Defaults to true
authenticateMetricsEndpoint=true
# Enable cache metrics data, default value is false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
+ "to take effect"
)
private boolean forwardAuthorizationCredentials = false;
@FieldContext(
category = CATEGORY_HTTP,
doc = "Whether to enable the proxy's /metrics, /proxy-stats, and /status.html http endpoints"
)
private boolean enableProxyStatsEndpoints = true;

@FieldContext(
category = CATEGORY_AUTHENTICATION,
doc = "Whether the '/metrics' endpoint requires authentication. Defaults to true."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,19 @@ public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
if (service != null) {
PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
if (metricsServlet != null) {
server.addServlet("/metrics", new ServletHolder(metricsServlet),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
if (config.isEnableProxyStatsEndpoints()) {
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
VipStatus.class);
server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service,
ProxyStats.class);
if (service != null) {
PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
if (metricsServlet != null) {
server.addServlet("/metrics", new ServletHolder(metricsServlet),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
}
}
}
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class);
server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class);

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,18 @@ public void addServlet(String basePath, ServletHolder servletHolder, List<Pair<S

public void addServlet(String basePath, ServletHolder servletHolder,
List<Pair<String, Object>> attributes, boolean requireAuthentication) {
Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
String.format("Cannot add servlet at %s, path %s already exists", basePath, existingPath.get()));
addServlet(basePath, servletHolder, attributes, requireAuthentication, true);
}

private void addServlet(String basePath, ServletHolder servletHolder,
List<Pair<String, Object>> attributes, boolean requireAuthentication, boolean checkForExistingPaths) {
if (checkForExistingPaths) {
Optional<String> existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst();
if (existingPath.isPresent()) {
throw new IllegalArgumentException(
String.format("Cannot add servlet at %s, path %s already exists", basePath,
existingPath.get()));
}
}
servletPaths.add(basePath);

Expand All @@ -220,11 +228,9 @@ public void addRestResource(String basePath, String attribute, Object attributeV
config.register(JsonMapperProvider.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(basePath);
context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(attribute, attributeValue);
handlers.add(context);
// This method has not historically checked for existing paths, so we don't check here either. The
// method call is added to reduce code duplication.
addServlet(basePath, servletHolder, Collections.singletonList(Pair.of(attribute, attributeValue)), true, false);
}

public int getExternalServicePort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.proxy.stats;

import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.Channel;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
Expand All @@ -27,7 +28,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -36,7 +40,12 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



Expand All @@ -45,19 +54,23 @@
@Produces(MediaType.APPLICATION_JSON)
public class ProxyStats {

private static final Logger log = LoggerFactory.getLogger(ProxyStats.class);
public static final String ATTRIBUTE_PULSAR_PROXY_NAME = "pulsar-proxy";

private ProxyService service;

@Context
protected ServletContext servletContext;
@Context
protected HttpServletRequest httpRequest;

@GET
@Path("/connections")
@ApiOperation(value = "Proxy stats api to get info for live connections",
response = List.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 503, message = "Proxy service is not initialized") })
public List<ConnectionStats> metrics() {
throwIfNotSuperUser("metrics");
List<ConnectionStats> stats = new ArrayList<>();
proxyService().getClientCnxs().forEach(cnx -> {
if (cnx.getDirectProxyHandler() == null) {
Expand All @@ -78,7 +91,7 @@ public List<ConnectionStats> metrics() {
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy logging should be > 2 to capture topic stats"),
@ApiResponse(code = 503, message = "Proxy service is not initialized") })
public Map<String, TopicStats> topics() {

throwIfNotSuperUser("topics");
Optional<Integer> logLevel = proxyService().getConfiguration().getProxyLogLevel();
if (!logLevel.isPresent() || logLevel.get() < 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy doesn't have logging level 2");
Expand All @@ -92,6 +105,7 @@ public Map<String, TopicStats> topics() {
notes = "It only changes log-level in memory, change it config file to persist the change")
@ApiResponses(value = { @ApiResponse(code = 412, message = "Proxy log level can be [0-2]"), })
public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
throwIfNotSuperUser("updateProxyLogLevel");
if (logLevel < 0 || logLevel > 2) {
throw new RestException(Status.PRECONDITION_FAILED, "Proxy log level can be only [0-2]");
}
Expand All @@ -102,6 +116,7 @@ public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) {
@Path("/logging")
@ApiOperation(hidden = true, value = "Get proxy logging")
public int getProxyLogLevel(@PathParam("logLevel") int logLevel) {
throwIfNotSuperUser("getProxyLogLevel");
return proxyService().getProxyLogLevel();
}

Expand All @@ -114,4 +129,26 @@ protected ProxyService proxyService() {
}
return service;
}

private void throwIfNotSuperUser(String action) {
if (proxyService().getConfiguration().isAuthorizationEnabled()) {
AuthenticationParameters authParams = AuthenticationParameters.builder()
.clientRole((String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName))
.clientAuthenticationDataSource((AuthenticationDataSource)
httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName))
.build();
try {
if (authParams.getClientRole() == null
|| !proxyService().getAuthorizationService().isSuperUser(authParams).get(30, SECONDS)) {
log.error("Client with role [{}] is not authorized to {}", authParams.getClientRole(), action);
throw new org.apache.pulsar.common.util.RestException(Status.UNAUTHORIZED,
"Client is not authorized to perform operation");
}
} catch (ExecutionException | TimeoutException | InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} is a super user role ", 30,
authParams.getClientRole());
throw new org.apache.pulsar.common.util.RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
}
}

0 comments on commit c644849

Please sign in to comment.