Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][monitor] Metrics: allow to configure a set of roles for the scrapers #53

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,13 @@ webSocketMaxTextFrameSize=1048576
# Whether the '/metrics' endpoint requires authentication. Defaults to false
authenticateMetricsEndpoint=false

# Whether the '/metrics' endpoint requires authorization. Defaults to true
authorizeMetricsEndpoint=true

# A list of role names (a comma-separated list of strings) able to retrieve metrics
# and system stats from the web server endpoints.
metricsRoles=

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

Expand Down
7 changes: 7 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ functionsWorkerEnablePackageManagement: false
# Whether the '/metrics' endpoint requires authentication. Defaults to true
authenticateMetricsEndpoint: true

# Whether the '/metrics' endpoint requires authorization. Defaults to true
authorizeMetricsEndpoint: true

# A list of role names (a comma-separated list of strings) able to retrieve metrics
# and system stats from the web server endpoints.
metricsRoles:

#################################################################
# Function metadata management (assignment, scheduling, and etc)
#################################################################
Expand Down
11 changes: 11 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ webServicePort=8080
# Port to use to server HTTPS request
webServicePortTls=

# Control whether the proxy-stats endpoint should be enabled or not.
exposeProxyStatsEndpoint=true

# Number of threads used for Netty IO. Default is set to `2 * Runtime.getRuntime().availableProcessors()`
numIOThreads=

Expand Down Expand Up @@ -372,5 +375,13 @@ zooKeeperCacheExpirySeconds=-1

# Whether the '/metrics' endpoint requires authentication. Defaults to true
authenticateMetricsEndpoint=true

# Whether the '/metrics' endpoint requires authorization. Defaults to true
authorizeMetricsEndpoint=true

# A list of role names (a comma-separated list of strings) able to retrieve metrics
# and system stats from the web server endpoints.
metricsRoles=

# Enable cache metrics data, default value is false
metricsBufferResponse=false
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,16 @@ webSocketMaxTextFrameSize=1048576

### --- Metrics --- ###

# Whether the '/metrics' endpoint requires authentication. Defaults to false
authenticateMetricsEndpoint=false

# Whether the '/metrics' endpoint requires authorization. Defaults to true
authorizeMetricsEndpoint=true

# A list of role names (a comma-separated list of strings) able to retrieve metrics
# and system stats from the web server endpoints.
metricsRoles=

# Enable topic level metrics
exposeTopicLevelMetricsInPrometheus=true

Expand Down
1 change: 1 addition & 0 deletions docker/pulsar/scripts/gen-yml-from-env.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
'authenticationProviders',
'superUserRoles',
'proxyRoles',
'metricsRoles',
'schemaRegistryCompatibilityCheckers',
'brokerClientTlsCiphers',
'brokerClientTlsProtocols',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,7 @@ void testAuthentication() throws Exception {
proxyConfig.setAuthenticationProviders(providers);

proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
ProxyService proxyService = new ProxyService(proxyConfig);

proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2831,6 +2831,19 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ "'authenticationEnabled' must also be set for this to take effect."
)
private boolean authenticateMetricsEndpoint = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Whether the '/metrics' endpoint requires authorization. Defaults to true."
+ "'authenticateMetricsEndpoint' must also be set for this to take effect."
+ "The super user roles and metrics roles will be allowed for the metrics endpoints."
)
private boolean authorizeMetricsEndpoint = true;
@FieldContext(
category = CATEGORY_METRICS,
doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics"
+ " and system stats from the web server endpoints."
)
private Set<String> metricsRoles = new TreeSet<>();
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, export topic level metrics otherwise namespace level"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,4 +815,38 @@ public Boolean allowTopicOperation(TopicName topicName,
throw new RestException(e.getCause());
}
}

public CompletableFuture<Boolean> allowToScrapeMetrics(AuthenticationParameters authParams) {
if (!isValidOriginalPrincipal(authParams)) {
return CompletableFuture.completedFuture(false);
}
if (isMetricsRole(authParams.getOriginalPrincipal())) {
return CompletableFuture.completedFuture(true);
} else {
return isSuperUser(authParams);
}
}

public CompletableFuture<Boolean> allowToScrapeMetrics(String user,
AuthenticationDataSource authenticationData) {
if (!conf.isAuthorizationEnabled() || isMetricsRole(user)) {
return CompletableFuture.completedFuture(true);
} else {
return isSuperUser(user, authenticationData);
}
}

private boolean isMetricsRole(String role) {
if (log.isDebugEnabled()) {
log.debug("Check if role {} is a metrics role", role);
}
if (!conf.isAuthenticateMetricsEndpoint() || !conf.isAuthorizeMetricsEndpoint()) {
return true;
}
final Set<String> metricsRoles = conf.getMetricsRoles();
if (metricsRoles != null && metricsRoles.contains(role)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Servlet filter that authorize the request only if the client role is a metric role or a super user.
*/
public class MetricsRoleBasedAuthorizationFilter implements Filter {
private static final Logger LOG = LoggerFactory.getLogger(MetricsRoleBasedAuthorizationFilter.class);

private final AuthorizationService authorizationService;

public MetricsRoleBasedAuthorizationFilter(AuthorizationService authorizationService) {
this.authorizationService = authorizationService;
}

protected String getAuthenticatedRole(HttpServletRequest request) {
return (String) request
.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
}

protected AuthenticationDataSource getAuthenticatedDataSource(HttpServletRequest request) {
return (AuthenticationDataSource) request
.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
final HttpServletRequest httpRequest = (HttpServletRequest) request;
final String role = getAuthenticatedRole(httpRequest);
final AuthenticationDataSource authenticatedDataSource = getAuthenticatedDataSource(httpRequest);

boolean authorized = authorizationService.allowToScrapeMetrics(role, authenticatedDataSource)
.join();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Role {} authorized to scrape metrics: {}", httpRequest.getRemoteAddr(), role, authorized);
}
if (authorized) {
chain.doFilter(request, response);
} else {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, "Forbidden");
LOG.warn("[{}] Failed to authorize HTTP request, role {} is not allowed for uri {}",
request.getRemoteAddr(),
role,
httpRequest.getRequestURI());
}
} catch (Exception e) {
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, "Forbidden");
LOG.error("[{}] Error performing authorization for HTTP", request.getRemoteAddr(), e);
}
}

@Override
public void init(FilterConfig arg) throws ServletException {
// No init necessary.
}

@Override
public void destroy() {
// No state to clean up.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -999,9 +999,12 @@ private void addWebServerHandlers(WebService webService,
true, attributeMap, true, Topics.class);

// Add metrics servlet
final boolean authenticateMetricsEndpoint = config.isAuthenticateMetricsEndpoint();
webService.addServlet("/metrics",
new ServletHolder(metricsServlet),
config.isAuthenticateMetricsEndpoint(), attributeMap);
authenticateMetricsEndpoint,
authenticateMetricsEndpoint,
attributeMap);

// Add websocket service
addWebSocketServiceHandler(webService, attributeMap, config);
Expand Down Expand Up @@ -1882,6 +1885,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles());
workerConfig.setProxyRoles(brokerConfig.getProxyRoles());
workerConfig.setFunctionsWorkerEnablePackageManagement(brokerConfig.isFunctionsWorkerEnablePackageManagement());
workerConfig.setMetricsRoles(brokerConfig.getMetricsRoles());

// inherit the nar package locations
if (isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import java.util.Map;
import java.util.Optional;
import javax.servlet.DispatcherType;
import javax.servlet.http.HttpServletRequest;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Handler;
Expand Down Expand Up @@ -200,6 +203,7 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication,
private static class FilterInitializer {
private final List<FilterHolder> filterHolders = new ArrayList<>();
private final FilterHolder authenticationFilterHolder;
private final FilterHolder metricsAuthorizationFilterHolder;
FilterInitializer(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfiguration();
if (config.getMaxConcurrentHttpRequests() > 0) {
Expand Down Expand Up @@ -227,8 +231,19 @@ private static class FilterInitializer {
authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(
pulsarService.getBrokerService().getAuthenticationService()));
filterHolders.add(authenticationFilterHolder);
if (config.isAuthorizationEnabled()) {
final AuthorizationService authorizationService = pulsarService
.getBrokerService().getAuthorizationService();
metricsAuthorizationFilterHolder = new FilterHolder(
new MetricsRoleBasedAuthorizationFilter(authorizationService)
);
filterHolders.add(metricsAuthorizationFilterHolder);
} else {
metricsAuthorizationFilterHolder = null;
}
} else {
authenticationFilterHolder = null;
metricsAuthorizationFilterHolder = null;
}

if (config.isDisableHttpDebugMethods()) {
Expand All @@ -246,19 +261,30 @@ private static class FilterInitializer {
}
}

public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
public void addFilters(ServletContextHandler context, boolean requiresAuthentication,
boolean requiresMetricsAuthorization) {
for (FilterHolder filterHolder : filterHolders) {
if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
context.addFilter(filterHolder,
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
if (filterHolder == authenticationFilterHolder && !requiresAuthentication) {
continue;
}
if (filterHolder == metricsAuthorizationFilterHolder && !requiresMetricsAuthorization) {
continue;
}
context.addFilter(filterHolder, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
}

}

public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
addServlet(path, servletHolder, requiresAuthentication, false, attributeMap);
}

public void addServlet(String path, ServletHolder servletHolder,
boolean requiresAuthentication,
boolean requiresMetricsAuthorization,
Map<String, Object> attributeMap) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
// Notice: each context path should be unique, but there's nothing here to verify that
context.setContextPath(path);
Expand All @@ -268,7 +294,7 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
context.setAttribute(key, value);
});
}
filterInitializer.addFilters(context, requiresAuthentication);
filterInitializer.addFilters(context, requiresAuthentication, requiresMetricsAuthorization);
handlers.add(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

public class MockAuthentication implements Authentication {
private static final Logger log = LoggerFactory.getLogger(MockAuthentication.class);
public static final String HTTP_HEADER_USER = "mockuser";
private final String user;

public MockAuthentication(String user) {
Expand All @@ -52,7 +53,7 @@ public AuthenticationDataProvider getAuthData() throws PulsarClientException {
public String getHttpAuthType() { return "mock"; }
@Override
public Set<Map.Entry<String, String>> getHttpHeaders() {
return Map.of("mockuser", user).entrySet();
return Map.of(HTTP_HEADER_USER, user).entrySet();
}
@Override
public boolean hasDataFromCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String getAuthMethodName() {
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
String principal = "unknown";
if (authData.hasDataFromHttp()) {
principal = authData.getHttpHeader("mockuser");
principal = authData.getHttpHeader(MockAuthentication.HTTP_HEADER_USER);
} else if (authData.hasDataFromCommand()) {
principal = authData.getCommandData();
}
Expand Down
Loading