Skip to content

Commit

Permalink
move the logic in the authz service
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Jun 5, 2023
1 parent c2f50a6 commit 7fe7cea
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
+ "'authenticationEnabled' must also be set for this to take effect."
)
private boolean authenticateMetricsEndpoint = false;
@FieldContext(
category = CATEGORY_METRICS,
doc = "Whether the '/metrics' endpoint requires authorization. Defaults to true."
+ "'authenticateMetricsEndpoint' must also be set for this to take effect." +
"The super user roles and metrics roles will be allowed for the metrics endpoints."
)
private boolean authorizeMetricsEndpoint = true;
@FieldContext(
category = CATEGORY_METRICS,
doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,4 +793,35 @@ public Boolean allowTopicOperation(TopicName topicName,
throw new RestException(e.getCause());
}
}

public CompletableFuture<Boolean> allowToScrapeMetrics(AuthenticationParameters authParams) {
if (!isValidOriginalPrincipal(authParams)) {
return CompletableFuture.completedFuture(false);
}
if (isMetricsRole(authParams.getOriginalPrincipal())) {
return CompletableFuture.completedFuture(true);
} else {
return isSuperUser(authParams);
}
}

public CompletableFuture<Boolean> allowToScrapeMetrics(String user,
AuthenticationDataSource authenticationData) {
if (isMetricsRole(user)) {
return CompletableFuture.completedFuture(true);
} else {
return isSuperUser(user, authenticationData);
}
}

private boolean isMetricsRole(String user) {
if (!this.conf.isAuthenticateMetricsEndpoint() || this.conf.isAuthorizeMetricsEndpoint()) {
return true;
}
final Set<String> metricsRoles = conf.getMetricsRoles();
if (metricsRoles != null && metricsRoles.contains(user)) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
final String role = getAuthenticatedRole(httpRequest);
final AuthenticationDataSource authenticatedDataSource = getAuthenticatedDataSource(httpRequest);

boolean authorized = isAuthorized(role, authenticatedDataSource);
boolean authorized = authorizationService.allowToScrapeMetrics(role, authenticatedDataSource)
.join();
if (authorized) {
chain.doFilter(request, response);
} else {
Expand All @@ -80,16 +81,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
}
}

private boolean isAuthorized(String role, AuthenticationDataSource authenticatedDataSource) {
if (StringUtils.isBlank(role)) {
return false;
}
if (roles != null && roles.contains(role)) {
return true;
}
return authorizationService.isSuperUser(role, authenticatedDataSource).join();
}

@Override
public void init(FilterConfig arg) throws ServletException {
// No init necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ public Map<String, Collection<String>> getAssignments(AuthenticationParameters a
return ret;
}

private void throwIfNotAuthorizedToScrapeMetrics(AuthenticationParameters authParams, String action) {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
return;
}

try {
if (authParams.getClientRole() == null || !worker().getAuthorizationService().allowToScrapeMetrics(authParams)
.get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS)) {
log.error("Client with role [{}] and originalPrincipal [{}] is not authorized to {}",
authParams.getClientRole(), authParams.getOriginalPrincipal(), action);
throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation");
}
} catch (ExecutionException | TimeoutException | InterruptedException e) {
log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a allowed to scrape metrics",
worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(),
authParams.getClientRole(), authParams.getOriginalPrincipal());
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
}

private void throwIfNotSuperUser(AuthenticationParameters authParams, String action) {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
try {
Expand All @@ -151,7 +171,7 @@ public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final Authe
if (!isWorkerServiceAvailable() || worker().getMetricsGenerator() == null) {
throwUnavailableException();
}
throwIfNotSuperUser(authParams, "get worker stats");
throwIfNotAuthorizedToScrapeMetrics(authParams, "get worker stats");
return worker().getMetricsGenerator().generate();
}

Expand All @@ -162,7 +182,7 @@ public List<WorkerFunctionInstanceStats> getFunctionsMetrics(AuthenticationParam
throwUnavailableException();
}

throwIfNotSuperUser(authParams, "get function stats");
throwIfNotAuthorizedToScrapeMetrics(authParams, "get function stats");

Map<String, FunctionRuntimeInfo> functionRuntimes = worker().getFunctionRuntimeManager()
.getFunctionRuntimeInfos();
Expand Down

0 comments on commit 7fe7cea

Please sign in to comment.