diff --git a/conf/broker.conf b/conf/broker.conf index 22ca71864e9cd..94afe0ba472a1 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1522,6 +1522,13 @@ webSocketMaxTextFrameSize=1048576 # Whether the '/metrics' endpoint requires authentication. Defaults to false authenticateMetricsEndpoint=false +# Whether the '/metrics' endpoint requires authorization. Defaults to true +authorizeMetricsEndpoint=true + +# A list of role names (a comma-separated list of strings) able to retrieve metrics +# and system stats from the web server endpoints. +metricsRoles= + # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 4c5b6aab1b7f4..b5f25d4b24ebe 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -61,6 +61,13 @@ functionsWorkerEnablePackageManagement: false # Whether the '/metrics' endpoint requires authentication. Defaults to true authenticateMetricsEndpoint: true +# Whether the '/metrics' endpoint requires authorization. Defaults to true +authorizeMetricsEndpoint: true + +# A list of role names (a comma-separated list of strings) able to retrieve metrics +# and system stats from the web server endpoints. +metricsRoles: + ################################################################# # Function metadata management (assignment, scheduling, and etc) ################################################################# diff --git a/conf/proxy.conf b/conf/proxy.conf index cfc1e47b7c445..70156a42ec876 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -77,6 +77,9 @@ webServicePort=8080 # Port to use to server HTTPS request webServicePortTls= +# Control whether the proxy-stats endpoint should be enabled or not. +exposeProxyStatsEndpoint=true + # Number of threads used for Netty IO. Default is set to `2 * Runtime.getRuntime().availableProcessors()` numIOThreads= @@ -372,5 +375,13 @@ zooKeeperCacheExpirySeconds=-1 # Whether the '/metrics' endpoint requires authentication. Defaults to true authenticateMetricsEndpoint=true + +# Whether the '/metrics' endpoint requires authorization. Defaults to true +authorizeMetricsEndpoint=true + +# A list of role names (a comma-separated list of strings) able to retrieve metrics +# and system stats from the web server endpoints. +metricsRoles= + # Enable cache metrics data, default value is false metricsBufferResponse=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 46e6aed76e42a..e68527051d296 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -965,6 +965,16 @@ webSocketMaxTextFrameSize=1048576 ### --- Metrics --- ### +# Whether the '/metrics' endpoint requires authentication. Defaults to false +authenticateMetricsEndpoint=false + +# Whether the '/metrics' endpoint requires authorization. Defaults to true +authorizeMetricsEndpoint=true + +# A list of role names (a comma-separated list of strings) able to retrieve metrics +# and system stats from the web server endpoints. +metricsRoles= + # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true diff --git a/docker/pulsar/scripts/gen-yml-from-env.py b/docker/pulsar/scripts/gen-yml-from-env.py index ce19436b7e0dd..ef8677996e5ce 100755 --- a/docker/pulsar/scripts/gen-yml-from-env.py +++ b/docker/pulsar/scripts/gen-yml-from-env.py @@ -45,6 +45,7 @@ 'authenticationProviders', 'superUserRoles', 'proxyRoles', + 'metricsRoles', 'schemaRegistryCompatibilityCheckers', 'brokerClientTlsCiphers', 'brokerClientTlsProtocols', diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java index 261efe680f862..a5a372e464a79 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java @@ -253,9 +253,7 @@ void testAuthentication() throws Exception { proxyConfig.setAuthenticationProviders(providers); proxyConfig.setForwardAuthorizationCredentials(true); - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + ProxyService proxyService = new ProxyService(proxyConfig); proxyService.start(); final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get(); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a709e49e3a980..f0d34725ea357 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2831,6 +2831,19 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, + "'authenticationEnabled' must also be set for this to take effect." ) private boolean authenticateMetricsEndpoint = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Whether the '/metrics' endpoint requires authorization. Defaults to true." + + "'authenticateMetricsEndpoint' must also be set for this to take effect." + + "The super user roles and metrics roles will be allowed for the metrics endpoints." + ) + private boolean authorizeMetricsEndpoint = true; + @FieldContext( + category = CATEGORY_METRICS, + doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics" + + " and system stats from the web server endpoints." + ) + private Set metricsRoles = new TreeSet<>(); @FieldContext( category = CATEGORY_METRICS, doc = "If true, export topic level metrics otherwise namespace level" diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 29abcc1eee414..d41db301e3ce4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -815,4 +815,38 @@ public Boolean allowTopicOperation(TopicName topicName, throw new RestException(e.getCause()); } } + + public CompletableFuture allowToScrapeMetrics(AuthenticationParameters authParams) { + if (!isValidOriginalPrincipal(authParams)) { + return CompletableFuture.completedFuture(false); + } + if (isMetricsRole(authParams.getOriginalPrincipal())) { + return CompletableFuture.completedFuture(true); + } else { + return isSuperUser(authParams); + } + } + + public CompletableFuture allowToScrapeMetrics(String user, + AuthenticationDataSource authenticationData) { + if (!conf.isAuthorizationEnabled() || isMetricsRole(user)) { + return CompletableFuture.completedFuture(true); + } else { + return isSuperUser(user, authenticationData); + } + } + + private boolean isMetricsRole(String role) { + if (log.isDebugEnabled()) { + log.debug("Check if role {} is a metrics role", role); + } + if (!conf.isAuthenticateMetricsEndpoint() || !conf.isAuthorizeMetricsEndpoint()) { + return true; + } + final Set metricsRoles = conf.getMetricsRoles(); + if (metricsRoles != null && metricsRoles.contains(role)) { + return true; + } + return false; + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/MetricsRoleBasedAuthorizationFilter.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/MetricsRoleBasedAuthorizationFilter.java new file mode 100644 index 0000000000000..7c29637549db1 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/MetricsRoleBasedAuthorizationFilter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import java.io.IOException; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Servlet filter that authorize the request only if the client role is a metric role or a super user. + */ +public class MetricsRoleBasedAuthorizationFilter implements Filter { + private static final Logger LOG = LoggerFactory.getLogger(MetricsRoleBasedAuthorizationFilter.class); + + private final AuthorizationService authorizationService; + + public MetricsRoleBasedAuthorizationFilter(AuthorizationService authorizationService) { + this.authorizationService = authorizationService; + } + + protected String getAuthenticatedRole(HttpServletRequest request) { + return (String) request + .getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName); + } + + protected AuthenticationDataSource getAuthenticatedDataSource(HttpServletRequest request) { + return (AuthenticationDataSource) request + .getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName); + } + + @Override + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + try { + final HttpServletRequest httpRequest = (HttpServletRequest) request; + final String role = getAuthenticatedRole(httpRequest); + final AuthenticationDataSource authenticatedDataSource = getAuthenticatedDataSource(httpRequest); + + boolean authorized = authorizationService.allowToScrapeMetrics(role, authenticatedDataSource) + .join(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Role {} authorized to scrape metrics: {}", httpRequest.getRemoteAddr(), role, authorized); + } + if (authorized) { + chain.doFilter(request, response); + } else { + HttpServletResponse httpResponse = (HttpServletResponse) response; + httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, "Forbidden"); + LOG.warn("[{}] Failed to authorize HTTP request, role {} is not allowed for uri {}", + request.getRemoteAddr(), + role, + httpRequest.getRequestURI()); + } + } catch (Exception e) { + HttpServletResponse httpResponse = (HttpServletResponse) response; + httpResponse.sendError(HttpServletResponse.SC_FORBIDDEN, "Forbidden"); + LOG.error("[{}] Error performing authorization for HTTP", request.getRemoteAddr(), e); + } + } + + @Override + public void init(FilterConfig arg) throws ServletException { + // No init necessary. + } + + @Override + public void destroy() { + // No state to clean up. + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 62d4634fa2d62..a23084e1d7803 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -999,9 +999,12 @@ private void addWebServerHandlers(WebService webService, true, attributeMap, true, Topics.class); // Add metrics servlet + final boolean authenticateMetricsEndpoint = config.isAuthenticateMetricsEndpoint(); webService.addServlet("/metrics", new ServletHolder(metricsServlet), - config.isAuthenticateMetricsEndpoint(), attributeMap); + authenticateMetricsEndpoint, + authenticateMetricsEndpoint, + attributeMap); // Add websocket service addWebSocketServiceHandler(webService, attributeMap, config); @@ -1882,6 +1885,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles()); workerConfig.setProxyRoles(brokerConfig.getProxyRoles()); workerConfig.setFunctionsWorkerEnablePackageManagement(brokerConfig.isFunctionsWorkerEnablePackageManagement()); + workerConfig.setMetricsRoles(brokerConfig.getMetricsRoles()); // inherit the nar package locations if (isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 2d6a6af58477e..488e7546e441b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -26,10 +26,13 @@ import java.util.Map; import java.util.Optional; import javax.servlet.DispatcherType; +import javax.servlet.http.HttpServletRequest; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.jetty.tls.JettySslContextFactory; import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Handler; @@ -200,6 +203,7 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, private static class FilterInitializer { private final List filterHolders = new ArrayList<>(); private final FilterHolder authenticationFilterHolder; + private final FilterHolder metricsAuthorizationFilterHolder; FilterInitializer(PulsarService pulsarService) { ServiceConfiguration config = pulsarService.getConfiguration(); if (config.getMaxConcurrentHttpRequests() > 0) { @@ -227,8 +231,19 @@ private static class FilterInitializer { authenticationFilterHolder = new FilterHolder(new AuthenticationFilter( pulsarService.getBrokerService().getAuthenticationService())); filterHolders.add(authenticationFilterHolder); + if (config.isAuthorizationEnabled()) { + final AuthorizationService authorizationService = pulsarService + .getBrokerService().getAuthorizationService(); + metricsAuthorizationFilterHolder = new FilterHolder( + new MetricsRoleBasedAuthorizationFilter(authorizationService) + ); + filterHolders.add(metricsAuthorizationFilterHolder); + } else { + metricsAuthorizationFilterHolder = null; + } } else { authenticationFilterHolder = null; + metricsAuthorizationFilterHolder = null; } if (config.isDisableHttpDebugMethods()) { @@ -246,12 +261,16 @@ private static class FilterInitializer { } } - public void addFilters(ServletContextHandler context, boolean requiresAuthentication) { + public void addFilters(ServletContextHandler context, boolean requiresAuthentication, + boolean requiresMetricsAuthorization) { for (FilterHolder filterHolder : filterHolders) { - if (requiresAuthentication || filterHolder != authenticationFilterHolder) { - context.addFilter(filterHolder, - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + if (filterHolder == authenticationFilterHolder && !requiresAuthentication) { + continue; + } + if (filterHolder == metricsAuthorizationFilterHolder && !requiresMetricsAuthorization) { + continue; } + context.addFilter(filterHolder, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); } } @@ -259,6 +278,13 @@ public void addFilters(ServletContextHandler context, boolean requiresAuthentica public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication, Map attributeMap) { + addServlet(path, servletHolder, requiresAuthentication, false, attributeMap); + } + + public void addServlet(String path, ServletHolder servletHolder, + boolean requiresAuthentication, + boolean requiresMetricsAuthorization, + Map attributeMap) { ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); // Notice: each context path should be unique, but there's nothing here to verify that context.setContextPath(path); @@ -268,7 +294,7 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require context.setAttribute(key, value); }); } - filterInitializer.addFilters(context, requiresAuthentication); + filterInitializer.addFilters(context, requiresAuthentication, requiresMetricsAuthorization); handlers.add(context); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java index 0b1726617f71f..2ff556c3e6815 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthentication.java @@ -29,6 +29,7 @@ public class MockAuthentication implements Authentication { private static final Logger log = LoggerFactory.getLogger(MockAuthentication.class); + public static final String HTTP_HEADER_USER = "mockuser"; private final String user; public MockAuthentication(String user) { @@ -52,7 +53,7 @@ public AuthenticationDataProvider getAuthData() throws PulsarClientException { public String getHttpAuthType() { return "mock"; } @Override public Set> getHttpHeaders() { - return Map.of("mockuser", user).entrySet(); + return Map.of(HTTP_HEADER_USER, user).entrySet(); } @Override public boolean hasDataFromCommand() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java index f896ecefbc9c7..2b2acc9add4e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockAuthenticationProvider.java @@ -45,7 +45,7 @@ public String getAuthMethodName() { public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { String principal = "unknown"; if (authData.hasDataFromHttp()) { - principal = authData.getHttpHeader("mockuser"); + principal = authData.getHttpHeader(MockAuthentication.HTTP_HEADER_USER); } else if (authData.hasDataFromCommand()) { principal = authData.getCommandData(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetricsAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetricsAuthenticationTest.java index 0a76e681120ae..ca1b8c8078208 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetricsAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetricsAuthenticationTest.java @@ -23,6 +23,7 @@ import javax.ws.rs.core.Response; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.auth.MockAuthentication; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.logging.LoggingFeature; @@ -40,6 +41,8 @@ protected void setup() throws Exception { conf.setAuthenticationEnabled(true); conf.setAuthenticationProviders( Sets.newHashSet("org.apache.pulsar.broker.auth.MockAuthenticationProvider")); + conf.setSuperUserRoles(Sets.newHashSet("pass.super")); + conf.setMetricsRoles(Sets.newHashSet("pass.metrics")); conf.setAuthorizationEnabled(true); } @@ -50,21 +53,44 @@ protected void cleanup() throws Exception { } @Test - void testGetMetricsByAuthenticate() throws Exception { + void testAuthenticateMetrics() throws Exception { conf.setAuthenticateMetricsEndpoint(true); super.internalSetup(); - @Cleanup - Client client = javax.ws.rs.client.ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - Response r = client.target(this.pulsar.getWebServiceAddress()).path("/metrics").request().get(); - Assert.assertEquals(r.getStatus(), Response.Status.UNAUTHORIZED.getStatusCode()); + Assert.assertEquals( + requestMetrics("fail.fail").getStatus(), + Response.Status.UNAUTHORIZED.getStatusCode() + ); + + Assert.assertEquals( + requestMetrics("pass.nometrics").getStatus(), + Response.Status.FORBIDDEN.getStatusCode() + ); + Assert.assertEquals( + requestMetrics("pass.super").getStatus(), + Response.Status.OK.getStatusCode() + ); + Assert.assertEquals( + requestMetrics("pass.metrics").getStatus(), + Response.Status.OK.getStatusCode() + ); } @Test void testGetMetricsByDefault() throws Exception { super.internalSetup(); + Response r = requestMetrics("fail.fail"); + Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); + } + + private Response requestMetrics(String user) { @Cleanup Client client = javax.ws.rs.client.ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - Response r = client.target(this.pulsar.getWebServiceAddress()).path("/metrics").request().get(); - Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); + + return client + .target(this.pulsar.getWebServiceAddress()) + .path("/metrics") + .request() + .header(MockAuthentication.HTTP_HEADER_USER, user) + .get(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerMetricsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerMetricsAuthTest.java new file mode 100644 index 0000000000000..9636c7f797f5c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerMetricsAuthTest.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Tenants; +import org.apache.pulsar.client.admin.internal.JacksonConfigurator; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.tls.NoopHostnameVerifier; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; +import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; +import org.apache.pulsar.functions.worker.rest.WorkerServer; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.jackson.JacksonFeature; +import org.glassfish.jersey.media.multipart.MultiPartFeature; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +@Slf4j +@Test(groups = "broker-admin") +public class PulsarWorkerMetricsAuthTest extends MockedPulsarServiceBaseTest { + + private static String getTLSFile(String name) { + return String.format("./src/test/resources/authentication/tls-http/%s.pem", name); + } + + WorkerConfig workerConfig; + WorkerServer workerServer; + PulsarFunctionTestTemporaryDirectory tempDirectory; + String adminUrl; + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setLoadBalancerEnabled(true); + conf.setBrokerServicePortTls(Optional.of(0)); + conf.setWebServicePortTls(Optional.of(0)); + conf.setTlsCertificateFilePath(getTLSFile("broker.cert")); + conf.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); + conf.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); + conf.setAuthenticationEnabled(true); + conf.setAuthenticationProviders( + Set.of("org.apache.pulsar.broker.authentication.AuthenticationProviderTls")); + conf.setSuperUserRoles(Set.of("admin", "superproxy")); + conf.setProxyRoles(Set.of("proxy", "superproxy")); + conf.setAuthorizationEnabled(true); + + conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationTls"); + conf.setBrokerClientAuthenticationParameters( + String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("admin.cert"), getTLSFile("admin.key-pk8"))); + conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert")); + conf.setBrokerClientTlsEnabled(true); + conf.setNumExecutorThreadPoolSize(5); + + super.internalSetup(); + + PulsarAdmin admin = mock(PulsarAdmin.class); + Tenants tenants = mock(Tenants.class); + when(admin.tenants()).thenReturn(tenants); + Set admins = Sets.newHashSet("superUser", "admin"); + TenantInfoImpl tenantInfo = new TenantInfoImpl(admins, null); + when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); + Namespaces namespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(namespaces); + + final WorkerStatsManager workerStatsManager = mock(WorkerStatsManager.class); + final FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class); + when(functionRuntimeManager.getFunctionRuntimeInfos()).thenReturn(Collections.emptyMap()); + when(functionRuntimeManager.getMyInstances()).thenReturn(0); + + final PulsarWorkerService functionsWorkerService = spy(createPulsarFunctionWorker(conf, admin)); + doNothing().when(functionsWorkerService).initAsStandalone(any(WorkerConfig.class)); + when(functionsWorkerService.getBrokerAdmin()).thenReturn(admin); + when(functionsWorkerService.getFunctionRuntimeManager()).thenReturn(functionRuntimeManager); + when(functionsWorkerService.getWorkerStatsManager()).thenReturn(workerStatsManager); + functionsWorkerService.init(workerConfig, null, false); + + AuthenticationService authenticationService = new AuthenticationService(conf); + AuthorizationService authorizationService = new AuthorizationService(conf, mock(PulsarResources.class)); + when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService); + when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService); + when(functionsWorkerService.isInitialized()).thenReturn(true); + + // mock: once authentication passes, function should return response: function already exist + FunctionMetaDataManager dataManager = mock(FunctionMetaDataManager.class); + when(dataManager.containsFunction(any(), any(), any())).thenReturn(true); + when(functionsWorkerService.getFunctionMetaDataManager()).thenReturn(dataManager); + + workerServer = new WorkerServer(functionsWorkerService, authenticationService, authorizationService); + workerServer.start(); + adminUrl = String.format("https://%s:%s", + functionsWorkerService.getWorkerConfig().getWorkerHostname(), workerServer.getListenPortHTTPS().get()); + } + + + private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config, + PulsarAdmin mockPulsarAdmin) { + workerConfig = new WorkerConfig(); + tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName()); + tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig); + // workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); + workerConfig.setSchedulerClassName( + org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getMapper().getObjectMapper() + .convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class)); + // worker talks to local broker + workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get()); + workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePort().get()); + workerConfig.setFailureCheckFreqMs(100); + workerConfig.setNumFunctionPackageReplicas(1); + workerConfig.setClusterCoordinationTopicName("coordinate"); + workerConfig.setFunctionAssignmentTopicName("assignment"); + workerConfig.setFunctionMetadataTopicName("metadata"); + workerConfig.setInstanceLivenessCheckFreqMs(100); + workerConfig.setWorkerPort(null); + workerConfig.setPulsarFunctionsCluster(config.getClusterName()); + String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + workerConfig.setWorkerHostname(hostname); + workerConfig.setWorkerId("c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort()); + + workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); + workerConfig.setBrokerClientAuthenticationParameters( + String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("admin.cert"), getTLSFile("admin.key-pk8"))); + workerConfig.setUseTls(true); + workerConfig.setTlsAllowInsecureConnection(true); + workerConfig.setTlsTrustCertsFilePath(getTLSFile("ca.cert")); + + workerConfig.setWorkerPortTls(0); + workerConfig.setTlsEnabled(true); + workerConfig.setTlsCertificateFilePath(getTLSFile("broker.cert")); + workerConfig.setTlsKeyFilePath(getTLSFile("broker.key-pk8")); + + workerConfig.setAuthenticationEnabled(true); + workerConfig.setAuthorizationEnabled(true); + + workerConfig.setSuperUserRoles(Sets.newHashSet("superUser", "admin")); + workerConfig.setMetricsRoles(Sets.newHashSet("user1")); + + PulsarWorkerService workerService = new PulsarWorkerService(new PulsarWorkerService.PulsarClientCreator() { + @Override + public PulsarAdmin newPulsarAdmin(String pulsarServiceUrl, WorkerConfig workerConfig) { + return mockPulsarAdmin; + } + + @Override + public PulsarClient newPulsarClient(String pulsarServiceUrl, WorkerConfig workerConfig) { + return null; + } + }); + + return workerService; + } + + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + workerServer.stop(); + } + + WebTarget buildWebClient(String user) throws Exception { + ClientConfig httpConfig = new ClientConfig(); + httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); + httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); + httpConfig.register(MultiPartFeature.class); + + ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig) + .register(JacksonConfigurator.class).register(JacksonFeature.class); + + X509Certificate trustCertificates[] = SecurityUtility.loadCertificatesFromPemFile( + getTLSFile("ca.cert")); + SSLContext sslCtx = SecurityUtility.createSslContext( + false, trustCertificates, + SecurityUtility.loadCertificatesFromPemFile(getTLSFile(user + ".cert")), + SecurityUtility.loadPrivateKeyFromPemFile(getTLSFile(user + ".key-pk8"))); + clientBuilder.sslContext(sslCtx).hostnameVerifier(NoopHostnameVerifier.INSTANCE); + Client client = clientBuilder.build(); + + return client.target(adminUrl); + } + + PulsarAdmin buildAdminClient(String user) throws Exception { + return PulsarAdmin.builder() + .allowTlsInsecureConnection(false) + .enableTlsHostnameVerification(false) + .serviceHttpUrl(adminUrl) + .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls", + String.format("tlsCertFile:%s,tlsKeyFile:%s", + getTLSFile(user + ".cert"), getTLSFile(user + ".key-pk8"))) + .tlsTrustCertsFilePath(getTLSFile("ca.cert")).build(); + } + + @Test + public void testWorkerMetricsStats() throws Exception { + try (PulsarAdmin admin = buildAdminClient("admin")) { + admin.worker().getMetrics(); + } + try (PulsarAdmin admin = buildAdminClient("user1")) { + admin.worker().getMetrics(); + } + try (PulsarAdmin admin = buildAdminClient("proxy")) { + admin.worker().getMetrics(); + fail(); + } catch (PulsarAdminException.NotAuthorizedException ex) { + assertTrue(ex.getMessage().contains("Forbidden")); + } + } + + + @Test + public void testWorkerFunctionsStats() throws Exception { + try (PulsarAdmin admin = buildAdminClient("admin")) { + admin.worker().getFunctionsStats(); + } + try (PulsarAdmin admin = buildAdminClient("user1")) { + admin.worker().getFunctionsStats(); + } + try (PulsarAdmin admin = buildAdminClient("proxy")) { + admin.worker().getFunctionsStats(); + fail(); + } catch (PulsarAdminException.NotAuthorizedException ex) { + assertTrue(ex.getMessage().contains("Forbidden")); + } + } + + @Test + public void testWorkerMetrics() throws Exception { + assertEquals(200, buildWebClient("admin") + .path("/metrics") + .request() + .get().getStatus()); + assertEquals(200, buildWebClient("user1") + .path("/metrics") + .request() + .get().getStatus()); + assertEquals(403, buildWebClient("proxy") + .path("/metrics") + .request() + .get().getStatus()); + } + + @Test + public void testWorkerInitialized() throws Exception { + assertEquals(200, buildWebClient("admin") + .path("/initialized") + .request() + .get().getStatus()); + assertEquals(200, buildWebClient("user1") + .path("/initialized") + .request() + .get().getStatus()); + assertEquals(403, buildWebClient("proxy") + .path("/initialized") + .request() + .get().getStatus()); + } + + @Test + public void testWorkerVersion() throws Exception { + assertEquals(200, buildWebClient("admin") + .path("/version") + .request() + .get().getStatus()); + assertEquals(200, buildWebClient("user1") + .path("/version") + .request() + .get().getStatus()); + assertEquals(403, buildWebClient("proxy") + .path("/version") + .request() + .get().getStatus()); + } + +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index 810ac69ac3eb3..748693a65bb3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -157,7 +157,7 @@ void setup(Method method) throws Exception { when(dataManager.containsFunction(any(), any(), any())).thenReturn(true); when(functionsWorkerService.getFunctionMetaDataManager()).thenReturn(dataManager); - workerServer = new WorkerServer(functionsWorkerService, authenticationService); + workerServer = new WorkerServer(functionsWorkerService, authenticationService, authorizationService); workerServer.start(); Thread.sleep(2000); String functionTlsUrl = String.format("https://%s:%s", diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java index e98d9bf57b31e..d82419869a04e 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ProxyStatsImpl.java @@ -23,11 +23,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; -/** - * Pulsar Admin API client. - * - * - */ public class ProxyStatsImpl extends BaseResource implements ProxyStats { private final WebTarget adminProxyStats; @@ -40,8 +35,7 @@ public ProxyStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs) @Override public String getConnections() throws PulsarAdminException { try { - String json = request(adminProxyStats.path("/connections")).get(String.class); - return json; + return request(adminProxyStats.path("/connections")).get(String.class); } catch (Exception e) { throw getApiException(e); } @@ -50,8 +44,7 @@ public String getConnections() throws PulsarAdminException { @Override public String getTopics() throws PulsarAdminException { try { - String json = request(adminProxyStats.path("/topics")).get(String.class); - return json; + return request(adminProxyStats.path("/topics")).get(String.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0ed73953d7aa7..fb2fcbe64d8e9 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -114,6 +114,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { + "'authenticationEnabled' must also be set for this to take effect." ) private boolean authenticateMetricsEndpoint = true; + @FieldContext( + category = CATEGORY_WORKER, + doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics" + + " and system stats from the web server endpoints." + ) + private Set metricsRoles = new TreeSet<>(); @FieldContext( category = CATEGORY_WORKER, doc = "Whether the '/metrics' endpoint should return default prometheus metrics. Defaults to false." diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 10b353505000f..ddf2438281788 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -53,8 +53,10 @@ public Worker(WorkerConfig workerConfig) { protected void start() throws Exception { workerService.initAsStandalone(workerConfig); - workerService.start(getAuthenticationService(), getAuthorizationService(), errorNotifier); - server = new WorkerServer(workerService, getAuthenticationService()); + final AuthenticationService authenticationService = getAuthenticationService(); + final AuthorizationService authorizationService = getAuthorizationService(); + workerService.start(authenticationService, authorizationService, errorNotifier); + server = new WorkerServer(workerService, authenticationService, authorizationService); server.start(); log.info("/** Started worker server **/"); @@ -72,7 +74,6 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept if (this.workerConfig.isAuthorizationEnabled()) { - log.info("starting configuration cache service"); try { configMetadataStore = PulsarResources.createConfigMetadataStore( workerConfig.getConfigurationMetadataStoreUrl(), @@ -83,7 +84,7 @@ private AuthorizationService getAuthorizationService() throws PulsarServerExcept } pulsarResources = new PulsarResources(null, configMetadataStore); return new AuthorizationService(getServiceConfiguration(), this.pulsarResources); - } + } return null; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 2b3ea30121015..fe1cb9e1ec832 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -24,9 +24,13 @@ import java.util.List; import java.util.Optional; import javax.servlet.DispatcherType; +import javax.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.broker.web.MetricsRoleBasedAuthorizationFilter; import org.apache.pulsar.broker.web.JettyRequestLogFactory; import org.apache.pulsar.broker.web.RateLimitingFilter; import org.apache.pulsar.broker.web.WebExecutorThreadPool; @@ -57,7 +61,6 @@ public class WorkerServer { private final WorkerConfig workerConfig; private final WorkerService workerService; - private final AuthenticationService authenticationService; private static final String MATCH_ALL = "/*"; private final WebExecutorThreadPool webServerExecutor; private Server server; @@ -67,13 +70,14 @@ public class WorkerServer { private final FilterInitializer filterInitializer; - public WorkerServer(WorkerService workerService, AuthenticationService authenticationService) { + public WorkerServer(WorkerService workerService, + AuthenticationService authenticationService, + AuthorizationService authorizationService) { this.workerConfig = workerService.getWorkerConfig(); this.workerService = workerService; - this.authenticationService = authenticationService; this.webServerExecutor = new WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), "function-web", this.workerConfig.getHttpServerThreadPoolQueueSize()); - this.filterInitializer = new FilterInitializer(workerConfig, authenticationService); + this.filterInitializer = new FilterInitializer(workerConfig, authenticationService, authorizationService); init(); } @@ -103,10 +107,10 @@ private void init() { new ResourceConfig(Resources.getApiV2Resources()), workerService, filterInitializer)); handlers.add(newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService, filterInitializer)); - // don't require auth for metrics or config routes + final boolean authenticateMetricsEndpoint = workerConfig.isAuthenticateMetricsEndpoint(); handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService, - workerConfig.isAuthenticateMetricsEndpoint(), filterInitializer)); + authenticateMetricsEndpoint, authenticateMetricsEndpoint, filterInitializer)); RequestLogHandler requestLogHandler = new RequestLogHandler(); requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger()); @@ -174,11 +178,15 @@ private void init() { server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } + private static class FilterInitializer { private final List filterHolders = new ArrayList<>(); private final FilterHolder authenticationFilterHolder; + private final FilterHolder metricsAuthorizationFilterHolder; - FilterInitializer(WorkerConfig config, AuthenticationService authenticationService) { + FilterInitializer(WorkerConfig config, + AuthenticationService authenticationService, + AuthorizationService authorizationService) { if (config.getMaxConcurrentHttpRequests() > 0) { FilterHolder filterHolder = new FilterHolder(QoSFilter.class); filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests())); @@ -193,17 +201,31 @@ private static class FilterInitializer { if (config.isAuthenticationEnabled()) { authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService)); filterHolders.add(authenticationFilterHolder); + if (config.isAuthorizationEnabled()) { + metricsAuthorizationFilterHolder = new FilterHolder( + new MetricsRoleBasedAuthorizationFilter(authorizationService) + ); + filterHolders.add(metricsAuthorizationFilterHolder); + } else { + metricsAuthorizationFilterHolder = null; + } } else { + metricsAuthorizationFilterHolder = null; authenticationFilterHolder = null; } } - public void addFilters(ServletContextHandler context, boolean requiresAuthentication) { + public void addFilters(ServletContextHandler context, + boolean requiresAuthentication, + boolean requiresMetricsAuthorization) { for (FilterHolder filterHolder : filterHolders) { - if (requiresAuthentication || filterHolder != authenticationFilterHolder) { - context.addFilter(filterHolder, - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + if (filterHolder == authenticationFilterHolder && !requiresAuthentication) { + continue; + } + if (filterHolder == metricsAuthorizationFilterHolder && !requiresMetricsAuthorization) { + continue; } + context.addFilter(filterHolder, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); } } } @@ -212,13 +234,15 @@ static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService, FilterInitializer filterInitializer) { - return newServletContextHandler(contextPath, config, workerService, true, filterInitializer); + return newServletContextHandler(contextPath, config, + workerService, true, false, filterInitializer); } static ServletContextHandler newServletContextHandler(String contextPath, ResourceConfig config, WorkerService workerService, boolean requireAuthentication, + boolean requireMetricsAuthorization, FilterInitializer filterInitializer) { final ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); @@ -232,7 +256,7 @@ static ServletContextHandler newServletContextHandler(String contextPath, new ServletHolder(new ServletContainer(config)); contextHandler.addServlet(apiServlet, MATCH_ALL); - filterInitializer.addFilters(contextHandler, requireAuthentication); + filterInitializer.addFilters(contextHandler, requireAuthentication, requireMetricsAuthorization); return contextHandler; } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java index 0b77be4ab0212..75171fd0a118c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java @@ -127,6 +127,28 @@ public Map> getAssignments(AuthenticationParameters a return ret; } + private void throwIfNotAuthorizedToScrapeMetrics(AuthenticationParameters authParams, String action) { + if (worker().getWorkerConfig().isAuthorizationEnabled()) { + return; + } + + final int metadataStoreOperationTimeoutSeconds = worker() + .getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(); + try { + if (authParams.getClientRole() == null || !worker().getAuthorizationService() + .allowToScrapeMetrics(authParams).get(metadataStoreOperationTimeoutSeconds, SECONDS)) { + log.error("Client with role [{}] and originalPrincipal [{}] is not authorized to {}", + authParams.getClientRole(), authParams.getOriginalPrincipal(), action); + throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation"); + } + } catch (ExecutionException | TimeoutException | InterruptedException e) { + log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a allowed to scrape metrics", + metadataStoreOperationTimeoutSeconds, + authParams.getClientRole(), authParams.getOriginalPrincipal()); + throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + private void throwIfNotSuperUser(AuthenticationParameters authParams, String action) { if (worker().getWorkerConfig().isAuthorizationEnabled()) { try { @@ -150,7 +172,7 @@ public List getWorkerMetrics(final Authe if (!isWorkerServiceAvailable() || worker().getMetricsGenerator() == null) { throwUnavailableException(); } - throwIfNotSuperUser(authParams, "get worker stats"); + throwIfNotAuthorizedToScrapeMetrics(authParams, "get worker stats"); return worker().getMetricsGenerator().generate(); } @@ -161,7 +183,7 @@ public List getFunctionsMetrics(AuthenticationParam throwUnavailableException(); } - throwIfNotSuperUser(authParams, "get function stats"); + throwIfNotAuthorizedToScrapeMetrics(authParams, "get function stats"); Map functionRuntimes = worker().getFunctionRuntimeManager() .getFunctionRuntimeInfos(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 3ecd670cbbf7a..39bff67aba0f9 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -287,6 +287,11 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private Optional webServicePortTls = Optional.empty(); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Control whether the proxy-stats endpoint should be enabled or not.") + private boolean exposeProxyStatsEndpoint = true; + @FieldContext( category = CATEGORY_KEYSTORE_TLS, doc = "Specify the TLS provider for the web service, available values can be SunJSSE, Conscrypt and etc." @@ -378,6 +383,13 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private boolean authenticateMetricsEndpoint = true; + @FieldContext( + category = CATEGORY_AUTHORIZATION, + doc = "A list of role names (a comma-separated list of strings) able to retrieve metrics" + + " and system stats from the web server endpoints." + ) + private Set metricsRoles = new TreeSet<>(); + @FieldContext( category = CATEGORY_SASL_AUTH, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index a934b8b078426..e4f20ee38cd5f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.Setter; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; @@ -82,6 +83,7 @@ public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final ServiceConfiguration serviceConfiguration; private final Authentication proxyClientAuthentication; @Getter private final DnsAddressResolverGroup dnsAddressResolverGroup; @@ -90,10 +92,10 @@ public class ProxyService implements Closeable { private String serviceUrl; private String serviceUrlTls; private final AuthenticationService authenticationService; - private AuthorizationService authorizationService; - private MetadataStoreExtended localMetadataStore; - private MetadataStoreExtended configMetadataStore; - private PulsarResources pulsarResources; + private final AuthorizationService authorizationService; + private final MetadataStoreExtended localMetadataStore; + private final MetadataStoreExtended configMetadataStore; + private final PulsarResources pulsarResources; @Getter private ProxyExtensions proxyExtensions = null; @@ -151,10 +153,10 @@ public class ProxyService implements Closeable { @Getter private final ConnectionController connectionController; - public ProxyService(ProxyConfiguration proxyConfig, - AuthenticationService authenticationService) throws Exception { + public ProxyService(ProxyConfiguration proxyConfig) throws Exception { requireNonNull(proxyConfig); this.proxyConfig = proxyConfig; + this.serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig); this.clientCnxs = Sets.newConcurrentHashSet(); this.topicStats = new ConcurrentHashMap<>(); @@ -170,7 +172,26 @@ public ProxyService(ProxyConfiguration proxyConfig, false, acceptorThreadFactory); this.workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), false, workersThreadFactory); - this.authenticationService = authenticationService; + + if (proxyConfig.isAuthorizationEnabled() && !proxyConfig.isAuthenticationEnabled()) { + throw new IllegalStateException("Invalid proxy configuration. Authentication must be enabled with " + + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true."); + } + this.authenticationService = new AuthenticationService(serviceConfiguration); + if (!isBlank(proxyConfig.getMetadataStoreUrl()) && !isBlank(proxyConfig.getConfigurationMetadataStoreUrl())) { + localMetadataStore = createLocalMetadataStore(); + configMetadataStore = createConfigurationMetadataStore(); + pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore); + discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources); + authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig), + pulsarResources); + } else { + localMetadataStore = null; + configMetadataStore = null; + pulsarResources = null; + discoveryProvider = null; + authorizationService = null; + } DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder() .channelType(EventLoopUtil.getDatagramChannelClass(workerGroup)); @@ -213,20 +234,6 @@ public ProxyService(ProxyConfiguration proxyConfig, } public void start() throws Exception { - if (proxyConfig.isAuthorizationEnabled() && !proxyConfig.isAuthenticationEnabled()) { - throw new IllegalStateException("Invalid proxy configuration. Authentication must be enabled with " - + "authenticationEnabled=true when authorization is enabled with authorizationEnabled=true."); - } - - if (!isBlank(proxyConfig.getMetadataStoreUrl()) && !isBlank(proxyConfig.getConfigurationMetadataStoreUrl())) { - localMetadataStore = createLocalMetadataStore(); - configMetadataStore = createConfigurationMetadataStore(); - pulsarResources = new PulsarResources(localMetadataStore, configMetadataStore); - discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, pulsarResources); - authorizationService = new AuthorizationService(PulsarConfigurationLoader.convertFrom(proxyConfig), - pulsarResources); - } - ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index beee9f1a4f763..800d6daf62037 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -40,7 +40,6 @@ import org.apache.logging.log4j.core.util.datetime.FixedDateFormat; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -198,12 +197,10 @@ public static void main(String[] args) throws Exception { } public void start() throws Exception { - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(config)); // create proxy service - proxyService = new ProxyService(config, authenticationService); + proxyService = new ProxyService(config); // create a web-service - server = new WebServer(config, authenticationService); + server = new WebServer(config, proxyService.getAuthenticationService(), proxyService.getAuthorizationService()); Runtime.getRuntime().addShutdownHook(new Thread(this::close)); @@ -259,12 +256,18 @@ public static void addWebServerHandlers(WebServer server, if (service != null) { PrometheusMetricsServlet metricsServlet = service.getMetricsServlet(); if (metricsServlet != null) { + final boolean authenticateMetricsEndpoint = config.isAuthenticateMetricsEndpoint(); server.addServlet("/metrics", new ServletHolder(metricsServlet), - Collections.emptyList(), config.isAuthenticateMetricsEndpoint()); + Collections.emptyList(), authenticateMetricsEndpoint, authenticateMetricsEndpoint); } } - server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(), VipStatus.class); - server.addRestResource("/proxy-stats", ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, ProxyStats.class); + server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, + config.getStatusFilePath(), VipStatus.class, false, false); + if (config.isExposeProxyStatsEndpoint()) { + server.addRestResource("/proxy-stats", + ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service, + ProxyStats.class, true, true); + } AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 1ca8dc93ebf9e..091bf1da2285c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -29,9 +29,13 @@ import java.util.List; import java.util.Optional; import javax.servlet.DispatcherType; +import javax.servlet.http.HttpServletRequest; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.broker.web.MetricsRoleBasedAuthorizationFilter; import org.apache.pulsar.broker.web.JettyRequestLogFactory; import org.apache.pulsar.broker.web.JsonMapperProvider; import org.apache.pulsar.broker.web.RateLimitingFilter; @@ -68,7 +72,6 @@ public class WebServer { private final Server server; private final WebExecutorThreadPool webServiceExecutor; - private final AuthenticationService authenticationService; private final List servletPaths = new ArrayList<>(); private final List handlers = new ArrayList<>(); private final ProxyConfiguration config; @@ -80,15 +83,21 @@ public class WebServer { private final FilterInitializer filterInitializer; - public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) { + public WebServer(ProxyService proxyService) { + this(proxyService.getConfiguration(), proxyService.getAuthenticationService(), + proxyService.getAuthorizationService()); + } + + public WebServer(ProxyConfiguration config, + AuthenticationService authenticationService, + AuthorizationService authorizationService) { + this.config = config; this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web", config.getHttpServerThreadPoolQueueSize()); this.server = new Server(webServiceExecutor); if (config.getMaxHttpServerConnections() > 0) { server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); } - this.authenticationService = authenticationService; - this.config = config; List connectors = new ArrayList<>(); @@ -146,7 +155,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); - filterInitializer = new FilterInitializer(config, authenticationService); + filterInitializer = new FilterInitializer(config, authenticationService, authorizationService); } public URI getServiceUri() { @@ -156,8 +165,11 @@ public URI getServiceUri() { private static class FilterInitializer { private final List filterHolders = new ArrayList<>(); private final FilterHolder authenticationFilterHolder; + private final FilterHolder metricsAuthorizationFilterHolder; - FilterInitializer(ProxyConfiguration config, AuthenticationService authenticationService) { + FilterInitializer(ProxyConfiguration config, + AuthenticationService authenticationService, + AuthorizationService authorizationService) { if (config.getMaxConcurrentHttpRequests() > 0) { FilterHolder filterHolder = new FilterHolder(QoSFilter.class); filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests())); @@ -170,19 +182,34 @@ private static class FilterInitializer { } if (config.isAuthenticationEnabled()) { - authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(authenticationService)); + authenticationFilterHolder = new FilterHolder( + new AuthenticationFilter(authenticationService) + ); filterHolders.add(authenticationFilterHolder); + if (config.isAuthorizationEnabled()) { + metricsAuthorizationFilterHolder = new FilterHolder( + new MetricsRoleBasedAuthorizationFilter(authorizationService) + ); + filterHolders.add(metricsAuthorizationFilterHolder); + } else { + metricsAuthorizationFilterHolder = null; + } } else { authenticationFilterHolder = null; + metricsAuthorizationFilterHolder = null; } } - public void addFilters(ServletContextHandler context, boolean requiresAuthentication) { + public void addFilters(ServletContextHandler context, + boolean requiresAuthentication, boolean requiresMetricsAuthorization) { for (FilterHolder filterHolder : filterHolders) { - if (requiresAuthentication || filterHolder != authenticationFilterHolder) { - context.addFilter(filterHolder, - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + if (filterHolder == authenticationFilterHolder && !requiresAuthentication) { + continue; + } + if (filterHolder == metricsAuthorizationFilterHolder && !requiresMetricsAuthorization) { + continue; } + context.addFilter(filterHolder, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); } } } @@ -195,10 +222,15 @@ public void addServlet(String basePath, ServletHolder servletHolder, List> attributes, + boolean requireAuthentication) { + addServlet(basePath, servletHolder, attributes, requireAuthentication, false); + } + public void addServlet(String basePath, ServletHolder servletHolder, - List> attributes, boolean requireAuthentication) { + List> attributes, + boolean requireAuthentication, boolean requireMetricsAuthorization) { popularServletParams(servletHolder, config); - Optional existingPath = servletPaths.stream().filter(p -> p.startsWith(basePath)).findFirst(); if (existingPath.isPresent()) { throw new IllegalArgumentException( @@ -213,7 +245,7 @@ public void addServlet(String basePath, ServletHolder servletHolder, context.setAttribute(attribute.getLeft(), attribute.getRight()); } - filterInitializer.addFilters(context, requireAuthentication); + filterInitializer.addFilters(context, requireAuthentication, requireMetricsAuthorization); handlers.add(context); } @@ -231,7 +263,12 @@ private static void popularServletParams(ServletHolder servletHolder, ProxyConfi } } - public void addRestResource(String basePath, String attribute, Object attributeValue, Class resourceClass) { + public void addRestResource(String basePath, + String attribute, + Object attributeValue, + Class resourceClass, + boolean requireAuthentication, + boolean requireMetricsAuthorization) { ResourceConfig config = new ResourceConfig(); config.register(resourceClass); config.register(JsonMapperProvider.class); @@ -241,6 +278,7 @@ public void addRestResource(String basePath, String attribute, Object attributeV context.setContextPath(basePath); context.addServlet(servletHolder, MATCH_ALL); context.setAttribute(attribute, attributeValue); + filterInitializer.addFilters(context, requireAuthentication, requireMetricsAuthorization); handlers.add(context); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java index 27e61c90e9e26..0797292d6b14a 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java @@ -101,7 +101,7 @@ public void updateProxyLogLevel(@PathParam("logLevel") int logLevel) { @GET @Path("/logging") @ApiOperation(hidden = true, value = "Get proxy logging") - public int getProxyLogLevel(@PathParam("logLevel") int logLevel) { + public int getProxyLogLevel() { return proxyService().getProxyLogLevel(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 79662097c3b2f..9247a156c0843 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -25,8 +25,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; @@ -141,8 +139,7 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java index af70276aed95e..d0b7fc01813f5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java @@ -26,9 +26,11 @@ import java.util.Optional; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -100,8 +102,10 @@ protected void setup() throws Exception { resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); - webServer = new WebServer(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))); + final ServiceConfiguration serviceConf = PulsarConfigurationLoader.convertFrom(proxyConfig); + webServer = new WebServer(proxyConfig, + new AuthenticationService(serviceConf), + new AuthorizationService(serviceConf, resource)); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java index c29bfaa964812..3f1c56a20caa0 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/InvalidProxyConfigForAuthorizationTest.java @@ -21,8 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.mockito.Mockito; import org.testng.annotations.Test; public class InvalidProxyConfigForAuthorizationTest { @@ -32,8 +30,7 @@ void startupShouldFailWhenAuthorizationIsEnabledWithoutAuthentication() throws E ProxyConfiguration proxyConfiguration = new ProxyConfiguration(); proxyConfiguration.setAuthorizationEnabled(true); proxyConfiguration.setAuthenticationEnabled(false); - try (ProxyService proxyService = new ProxyService(proxyConfiguration, - Mockito.mock(AuthenticationService.class))) { + try (ProxyService proxyService = new ProxyService(proxyConfiguration)) { proxyService.start(); fail("An exception should have been thrown"); } catch (Exception e) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java index 17cd3c33e799d..51c142e291442 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java @@ -82,8 +82,7 @@ protected void setup() throws Exception { // this is for nar package test // addServletNar(); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -96,7 +95,7 @@ protected void setup() throws Exception { mockAdditionalServlet(); - proxyWebServer = new WebServer(proxyConfig, authService); + proxyWebServer = new WebServer(proxyService); ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); proxyWebServer.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java index bfe86f86976ee..3812ac5833e17 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java @@ -32,7 +32,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; @@ -42,7 +41,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -138,8 +136,7 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 8229d929ee5e3..ec7acf4c11915 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -39,7 +39,6 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; @@ -47,7 +46,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -230,10 +228,8 @@ void testAuthentication() throws Exception { providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); proxyConfig.setForwardAuthorizationCredentials(true); - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + ProxyService proxyService = new ProxyService(proxyConfig); proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java index 336f11ae19da6..b0e424c009196 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java @@ -25,12 +25,10 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -59,8 +57,7 @@ protected void setup() throws Exception { proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java index 3aa71413d540b..b3a7dfe21c730 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyDisableZeroCopyTest.java @@ -20,8 +20,6 @@ import static org.mockito.Mockito.doReturn; import java.util.Optional; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.BeforeClass; @@ -39,8 +37,7 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setProxyZeroCopyModeEnabled(false); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java index 8b3092c6f5170..a171113c1a1d5 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java @@ -20,13 +20,11 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -61,8 +59,7 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setHaProxyProtocolEnabled(true); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 99af3b1cf6abe..09eb61b7719df 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -28,15 +28,13 @@ import lombok.Cleanup; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication; -import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthenticationProvider; +import org.apache.pulsar.proxy.server.mocks.BasicAuthentication; +import org.apache.pulsar.proxy.server.mocks.BasicAuthenticationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -114,9 +112,7 @@ public void testForwardAuthData() throws Exception { providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - try (ProxyService proxyService = new ProxyService(proxyConfig, authenticationService)) { + try (ProxyService proxyService = new ProxyService(proxyConfig)) { proxyService.start(); try (PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), clientAuthParams)) { proxyClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); @@ -128,11 +124,8 @@ public void testForwardAuthData() throws Exception { // Step 3: Create proxy with forwardAuthData enabled proxyConfig.setForwardAuthorizationCredentials(true); - authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - @Cleanup - ProxyService proxyService = new ProxyService(proxyConfig, authenticationService); + ProxyService proxyService = new ProxyService(proxyConfig); proxyService.start(); @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java index 246dd9f85e319..c20cd8e54aa37 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyIsAHttpProxyTest.java @@ -37,8 +37,11 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -57,6 +60,7 @@ import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.logging.LoggingFeature; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -199,10 +203,8 @@ public void testSingleRedirect() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -228,10 +230,7 @@ public void testMultipleRedirect() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -249,6 +248,17 @@ public void testMultipleRedirect() throws Exception { } } + @NotNull + private WebServer newWebServer(ProxyConfiguration proxyConfig) throws PulsarServerException { + AuthenticationService authService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + AuthorizationService authzService = new AuthorizationService( + PulsarConfigurationLoader.convertFrom(proxyConfig), resource); + + WebServer webServer = new WebServer(proxyConfig, authService, authzService); + return webServer; + } + @Test(expectedExceptions = IllegalArgumentException.class) public void testTryingToUseExistingPath() throws Exception { Properties props = new Properties(); @@ -259,10 +269,7 @@ public void testTryingToUseExistingPath() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); @@ -278,10 +285,7 @@ public void testLongPathInProxyTo() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -305,10 +309,7 @@ public void testProxyToEndsInSlash() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -331,10 +332,7 @@ public void testLongPath() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -356,15 +354,16 @@ public void testLongUri() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); + final ServiceConfiguration conf = PulsarConfigurationLoader.convertFrom(proxyConfig); + AuthenticationService authService = new AuthenticationService(conf); + AuthorizationService authzService = new AuthorizationService(conf, null); StringBuilder longUri = new StringBuilder("/service3/tp"); for (int i = 10 * 1024; i > 0; i = i - 11){ longUri.append("_sub1_RETRY"); } - WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService); + WebServer webServerMaxUriLen8k = new WebServer(proxyConfig, authService, authzService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen8k, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServerMaxUriLen8k.start(); @@ -376,7 +375,7 @@ public void testLongUri() throws Exception { } proxyConfig.setHttpMaxRequestHeaderSize(12 * 1024); - WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService); + WebServer webServerMaxUriLen12k = new WebServer(proxyConfig, authService, authzService); ProxyServiceStarter.addWebServerHandlers(webServerMaxUriLen12k, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServerMaxUriLen12k.start(); @@ -397,10 +396,7 @@ public void testPathEndsInSlash() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -429,10 +425,7 @@ public void testStreaming() throws Exception { props.setProperty("webServicePort", "0"); ProxyConfiguration proxyConfig = PulsarConfigurationLoader.create(props, ProxyConfiguration.class); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = newWebServer(proxyConfig); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, null, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java index 88e7b269d6eeb..26975f382963d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithAuth.java @@ -32,7 +32,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -41,7 +40,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -87,9 +85,7 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderTls.class.getName()); proxyConfig.setAuthenticationProviders(providers); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java index 5feef74e3b94b..cef5ec46531f9 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTestWithoutAuth.java @@ -28,7 +28,6 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -37,7 +36,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -75,8 +73,7 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java index 5c4e40ed65a70..34f1bed42f19b 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java @@ -23,12 +23,10 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; @@ -86,9 +84,7 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH); proxyConfig.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 4861117ef6ff5..ef3b0fa7d2639 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -27,11 +27,9 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.Assert; @@ -58,9 +56,7 @@ protected void setup() throws Exception { proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP); proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION); - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java index ad237c2539700..e57cc9950b281 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java @@ -25,13 +25,11 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.AfterClass; @@ -67,8 +65,7 @@ protected void setup() throws Exception { proxyConfig.setTlsRequireTrustedClientCertOnConnect(true); proxyConfig.setTlsAllowInsecureConnection(false); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 82cd702aa7f0a..58a9ea250bec8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -33,7 +33,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -47,7 +46,6 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.ProtocolVersion; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -77,8 +75,7 @@ protected void setup() throws Exception { //enable full parsing feature proxyConfig.setProxyLogLevel(Optional.ofNullable(2)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java index 6948996ad4636..e16a187583f78 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java @@ -41,8 +41,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.awaitility.Awaitility; import org.glassfish.jersey.client.ClientConfig; @@ -72,8 +70,7 @@ protected void setup() throws Exception { proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); proxyConfig.setClusterName(TEST_CLUSTER); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); @@ -81,10 +78,7 @@ protected void setup() throws Exception { proxyService.addPrometheusRawMetricsProvider(stream -> stream.write("test_metrics{label1=\"xyz\"} 10 \n")); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - proxyWebServer = new WebServer(proxyConfig, authService); + proxyWebServer = new WebServer(proxyService); ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); proxyWebServer.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java index bde989fc432f9..259d3ed43ad7d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java @@ -32,7 +32,6 @@ import javax.crypto.SecretKey; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -40,7 +39,6 @@ import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; @@ -124,9 +122,7 @@ protected void setup() throws Exception { properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); proxyConfig.setProperties(properties); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); } @AfterClass(alwaysRun = true) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java index 2c8c382b6a5ef..2cb12d179e76a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java @@ -20,26 +20,18 @@ import static org.mockito.Mockito.spy; import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.HashMap; + import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import javax.naming.AuthenticationException; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationProvider; -import org.apache.pulsar.broker.authentication.AuthenticationService; + import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.proxy.server.mocks.BasicAuthentication; +import org.apache.pulsar.proxy.server.mocks.BasicAuthenticationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -50,94 +42,6 @@ public class ProxyRolesEnforcementTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyRolesEnforcementTest.class); - public static class BasicAuthenticationData implements AuthenticationDataProvider { - private final String authParam; - - public BasicAuthenticationData(String authParam) { - this.authParam = authParam; - } - - public boolean hasDataFromCommand() { - return true; - } - - public String getCommandData() { - return authParam; - } - - public boolean hasDataForHttp() { - return true; - } - - @Override - public Set> getHttpHeaders() { - Map headers = new HashMap<>(); - headers.put("BasicAuthentication", authParam); - return headers.entrySet(); - } - } - - public static class BasicAuthentication implements Authentication { - - private String authParam; - - @Override - public void close() throws IOException { - // noop - } - - @Override - public String getAuthMethodName() { - return "BasicAuthentication"; - } - - @Override - public AuthenticationDataProvider getAuthData() throws PulsarClientException { - try { - return new BasicAuthenticationData(authParam); - } catch (Exception e) { - throw new PulsarClientException(e); - } - } - - @Override - public void configure(Map authParams) { - this.authParam = authParams.get("authParam"); - } - - @Override - public void start() throws PulsarClientException { - // noop - } - } - - public static class BasicAuthenticationProvider implements AuthenticationProvider { - - @Override - public void close() throws IOException { - } - - @Override - public void initialize(ServiceConfiguration config) throws IOException { - } - - @Override - public String getAuthMethodName() { - return "BasicAuthentication"; - } - - @Override - public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { - if (authData.hasDataFromCommand()) { - return authData.getCommandData(); - } else if (authData.hasDataFromHttp()) { - return authData.getHttpHeader("BasicAuthentication"); - } - - return null; - } - } - @BeforeMethod @Override protected void setup() throws Exception { @@ -217,9 +121,7 @@ public void testIncorrectRoles() throws Exception { providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); - try (ProxyService proxyService = new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))) { + try (ProxyService proxyService = new ProxyService(proxyConfig)) { proxyService.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsAuthenticationTest.java new file mode 100644 index 0000000000000..607482d5475a7 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsAuthenticationTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + + +public class ProxyStatsAuthenticationTest extends ProxyStatsTest { + + public ProxyStatsAuthenticationTest() { + super(true); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsDisabledTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsDisabledTest.java new file mode 100644 index 0000000000000..e5714c1656218 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsDisabledTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static org.mockito.Mockito.doReturn; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Response; +import java.util.Optional; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.logging.LoggingFeature; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ProxyStatsDisabledTest extends ProducerConsumerBase { + + private ProxyService proxyService; + private WebServer proxyWebServer; + private Client httpClient; + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setClusterName("test"); + super.init(); + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); + proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); + proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); + proxyConfig.setProxyLogLevel(Optional.of(2)); + proxyConfig.setExposeProxyStatsEndpoint(false); + AuthenticationService authService = new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig)); + + + proxyService = Mockito.spy(new ProxyService(proxyConfig)); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); + proxyService.start(); + + proxyWebServer = new WebServer(proxyService); + ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); + proxyWebServer.start(); + + httpClient = ClientBuilder + .newClient(new ClientConfig().register(LoggingFeature.class)); + } + + @Override + protected ServiceConfiguration getDefaultConf() { + ServiceConfiguration conf = super.getDefaultConf(); + // wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being + // unregistered asynchronously. This impacts the execution of the next test method if this would be happening. + conf.setBrokerShutdownTimeoutMs(5000L); + return conf; + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + internalCleanup(); + proxyService.close(); + httpClient.close(); + } + + @Test + public void testEndpoints() throws Exception { + Assert.assertEquals(httpBuilder("/proxy-stats").get() + .getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + Assert.assertEquals(httpBuilder("/proxy-stats/connections").get() + .getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + Assert.assertEquals(httpBuilder("/proxy-stats/topics").get() + .getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + Assert.assertEquals(httpBuilder("/proxy-stats/logging").get() + .getStatus(), Response.Status.NOT_FOUND.getStatusCode()); + } + + private Invocation.Builder httpBuilder(String path) { + return httpClient + .target(proxyWebServer.getServiceUri()) + .path(path) + .request(); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsNoAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsNoAuthenticationTest.java new file mode 100644 index 0000000000000..bb099268d7a6f --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsNoAuthenticationTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + + +public class ProxyStatsNoAuthenticationTest extends ProxyStatsTest { + + public ProxyStatsNoAuthenticationTest() { + super(false); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java index 140af88aae71b..c45dff7843e7a 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java @@ -20,32 +20,39 @@ import static java.util.Objects.requireNonNull; import static org.mockito.Mockito.doReturn; -import static org.testng.Assert.assertEquals; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertNotNull; +import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; - +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import lombok.Cleanup; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.proxy.server.mocks.BasicAuthentication; +import org.apache.pulsar.proxy.server.mocks.BasicAuthenticationData; +import org.apache.pulsar.proxy.server.mocks.BasicAuthenticationProvider; import org.apache.pulsar.proxy.stats.ConnectionStats; import org.apache.pulsar.proxy.stats.TopicStats; import org.glassfish.jersey.client.ClientConfig; @@ -56,40 +63,95 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class ProxyStatsTest extends MockedPulsarServiceBaseTest { +public abstract class ProxyStatsTest extends ProducerConsumerBase { private ProxyService proxyService; private WebServer proxyWebServer; - private final ProxyConfiguration proxyConfig = new ProxyConfiguration(); + private Client httpClient; + + private final boolean authEnabled; + + public ProxyStatsTest(boolean authEnabled) { + this.authEnabled = authEnabled; + } + @BeforeClass(alwaysRun = true) @Override - @BeforeClass protected void setup() throws Exception { - internalSetup(); + if (authEnabled) { + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); + conf.setBrokerClientAuthenticationParameters("authParam:admin"); + conf.setAuthenticateOriginalAuthData(false); + + + Set superUserRoles = new HashSet(); + superUserRoles.add("admin"); + superUserRoles.add("proxy"); + conf.setSuperUserRoles(superUserRoles); + + Set providers = new HashSet(); + providers.add(BasicAuthenticationProvider.class.getName()); + conf.setAuthenticationProviders(providers); + Set proxyRoles = new HashSet(); + proxyRoles.add("proxy"); + conf.setProxyRoles(proxyRoles); + } + conf.setClusterName("test"); + + super.init(); + + + if (authEnabled) { + String adminAuthParams = "authParam:admin"; + admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()) + .authentication(BasicAuthentication.class.getName(), adminAuthParams).build()); + } + producerBaseSetup(); + + + if (authEnabled) { + + String namespaceName = "my-property/my-ns"; + admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", + Sets.newHashSet(AuthAction.consume, AuthAction.produce)); + } + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setServicePort(Optional.of(0)); proxyConfig.setBrokerProxyAllowedTargetPorts("*"); proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl()); proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - // enable full parsing feature proxyConfig.setProxyLogLevel(Optional.of(2)); + if (authEnabled) { + proxyConfig.setSuperUserRoles(Set.of("admin")); + proxyConfig.setAuthenticationEnabled(true); + proxyConfig.setAuthorizationEnabled(true); + proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); + String proxyAuthParams = "authParam:proxy"; + proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); + Set providers = new HashSet(); + providers.add(BasicAuthenticationProvider.class.getName()); + proxyConfig.setAuthenticationProviders(providers); + proxyConfig.setForwardAuthorizationCredentials(true); + } - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); - - Optional proxyLogLevel = Optional.of(2); - assertEquals(proxyLogLevel, proxyService.getConfiguration().getProxyLogLevel()); proxyService.start(); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - - proxyWebServer = new WebServer(proxyConfig, authService); + proxyWebServer = new WebServer(proxyService); ProxyServiceStarter.addWebServerHandlers(proxyWebServer, proxyConfig, proxyService, null); proxyWebServer.start(); + + httpClient = ClientBuilder + .newClient(new ClientConfig().register(LoggingFeature.class)); + } @Override @@ -106,23 +168,20 @@ protected ServiceConfiguration getDefaultConf() { protected void cleanup() throws Exception { internalCleanup(); proxyService.close(); + httpClient.close(); } - /** - * Validates proxy connection stats api. - * - * @throws Exception - */ @Test public void testConnectionsStats() throws Exception { - final String topicName1 = "persistent://sample/test/local/connections-stats"; + String topicName1 = "persistent://my-property/my-ns/my-topic1"; + @Cleanup - PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build(); + PulsarClient client = createPulsarClient(); Producer producer = client.newProducer(Schema.BYTES).topic(topicName1).enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - // Create a consumer directly attached to broker - Consumer consumer = pulsarClient.newConsumer().topic(topicName1).subscriptionName("my-sub").subscribe(); + @Cleanup + Consumer consumer = client.newConsumer().topic(topicName1).subscriptionName("my-sub").subscribe(); int totalMessages = 10; for (int i = 0; i < totalMessages; i++) { @@ -134,43 +193,50 @@ public void testConnectionsStats() throws Exception { requireNonNull(msg); consumer.acknowledge(msg); } + if (authEnabled) { + assertGetConnectionStats(null, false); + assertGetConnectionStats("notexists", false); + assertGetConnectionStats("client", false); + assertGetConnectionStats("admin", true); + } else { + assertGetConnectionStats(null, true); + } + } - @Cleanup - Client httpClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - Response r = httpClient.target(proxyWebServer.getServiceUri()).path("/proxy-stats/connections").request() - .get(); - Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); - String response = r.readEntity(String.class).trim(); - List connectionStats = new Gson().fromJson(response, new TypeToken>() { - }.getType()); - - assertNotNull(connectionStats); - - consumer.close(); + private void assertGetConnectionStats(String role, boolean expectOk) { + if (!expectOk) { + Assert.assertEquals(httpBuilder("/proxy-stats/connections", role).get() + .getStatus(), Response.Status.FORBIDDEN.getStatusCode()); + } else { + Response r = httpBuilder("/proxy-stats/connections", role).get(); + Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); + String response = r.readEntity(String.class).trim(); + List connectionStats = new Gson().fromJson(response, new TypeToken>() { + }.getType()); + assertNotNull(connectionStats); + } } - /** - * Validate proxy topic stats api - * - * @throws Exception - */ @Test public void testTopicStats() throws Exception { proxyService.setProxyLogLevel(2); - final String topicName = "persistent://sample/test/local/topic-stats"; - final String topicName2 = "persistent://sample/test/local/topic-stats-2"; + final String topicName = "persistent://my-property/my-ns/my-topic1"; + final String topicName2 = "persistent://my-property/my-ns/my-topic2"; @Cleanup - PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build(); + PulsarClient client = createPulsarClient(); + @Cleanup Producer producer1 = client.newProducer(Schema.BYTES).topic(topicName).enableBatching(false) .producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + @Cleanup Producer producer2 = client.newProducer(Schema.BYTES).topic(topicName2).enableBatching(false) .producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - // Create a consumer directly attached to broker - Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); - Consumer consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName("my-sub") + @Cleanup + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); + @Cleanup + Consumer consumer2 = client.newConsumer().topic(topicName2).subscriptionName("my-sub") .subscribe(); int totalMessages = 10; @@ -185,37 +251,101 @@ public void testTopicStats() throws Exception { consumer.acknowledge(msg); msg = consumer2.receive(1, TimeUnit.SECONDS); } + if (authEnabled) { + assertGetTopicsStats(topicName, null, false); + assertGetTopicsStats(topicName, "notexists", false); + assertGetTopicsStats(topicName, "client", false); + assertGetTopicsStats(topicName, "admin", true); + } else { + assertGetTopicsStats(topicName, null, true); + } + } - @Cleanup - Client httpClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - Response r = httpClient.target(proxyWebServer.getServiceUri()).path("/proxy-stats/topics").request() - .get(); - Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); - String response = r.readEntity(String.class).trim(); - Map topicStats = new Gson().fromJson(response, new TypeToken>() { - }.getType()); - - assertNotNull(topicStats.get(topicName)); - - consumer.close(); - consumer2.close(); + private void assertGetTopicsStats(String topicName, String role, boolean expectOk) { + if (!expectOk) { + Assert.assertEquals(httpBuilder("/proxy-stats/topics", role).get() + .getStatus(), Response.Status.FORBIDDEN.getStatusCode()); + + } else { + Response r = httpBuilder("/proxy-stats/topics", role).get(); + Assert.assertEquals(r.getStatus(), Response.Status.OK.getStatusCode()); + String response = r.readEntity(String.class).trim(); + Map topicStats = new Gson().fromJson(response, new TypeToken>() { + }.getType()); + + assertNotNull(topicStats.get(topicName)); + } } - /** - * Change proxy log level dynamically - * - * @throws Exception - */ @Test public void testChangeLogLevel() { Assert.assertEquals(proxyService.getProxyLogLevel(), 2); int newLogLevel = 1; - @Cleanup - Client httpClient = ClientBuilder.newClient(new ClientConfig().register(LoggingFeature.class)); - Response r = httpClient.target(proxyWebServer.getServiceUri()).path("/proxy-stats/logging/" + newLogLevel) - .request().post(Entity.entity("", MediaType.APPLICATION_JSON)); - Assert.assertEquals(r.getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - Assert.assertEquals(proxyService.getProxyLogLevel(), newLogLevel); + + if (authEnabled) { + assertUpdateLogLevel(newLogLevel, null, false); + assertUpdateLogLevel(newLogLevel, "notexists", false); + assertUpdateLogLevel(newLogLevel, "client", false); + assertUpdateLogLevel(newLogLevel, "admin", true); + + assertGetLogLevel(newLogLevel, null, false); + assertGetLogLevel(newLogLevel, "notexists", false); + assertGetLogLevel(newLogLevel, "client", false); + assertGetLogLevel(newLogLevel, "admin", true); + } else { + assertUpdateLogLevel(newLogLevel, null, true); + assertGetLogLevel(newLogLevel, null, true); + } + } + + private void assertGetLogLevel(int expectedLevel, String role, boolean expectOk) { + if (!expectOk) { + Assert.assertEquals(httpBuilder("/proxy-stats/logging", role).get() + .getStatus(), Response.Status.FORBIDDEN.getStatusCode()); + } else { + final Response getResponse = httpBuilder("/proxy-stats/logging", role) + .get(); + Assert.assertEquals(getResponse.getStatus(), Response.Status.OK.getStatusCode()); + Assert.assertEquals(getResponse.readEntity(int.class), expectedLevel); + } + } + + private void assertUpdateLogLevel(int newLogLevel, String role, boolean expectOk) { + final String postUrl = "/proxy-stats/logging/" + newLogLevel; + if (!expectOk) { + Assert.assertEquals(httpBuilder(postUrl, role) + .post(Entity.entity("", MediaType.APPLICATION_JSON)) + .getStatus(), Response.Status.FORBIDDEN.getStatusCode()); + Assert.assertNotEquals(proxyService.getProxyLogLevel(), newLogLevel); + } else { + Response r = httpBuilder(postUrl, role) + .post(Entity.entity("", MediaType.APPLICATION_JSON)); + Assert.assertEquals(r.getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + Assert.assertEquals(proxyService.getProxyLogLevel(), newLogLevel); + } + } + + + private Invocation.Builder httpBuilder(String path, String role) { + BasicAuthenticationData auth = + new BasicAuthenticationData(role); + final Invocation.Builder builder = httpClient + .target(proxyWebServer.getServiceUri()) + .path(path) + .request(); + + Set> headers = auth.getHttpHeaders(); + if (headers != null) { + headers.forEach(entry -> builder.header(entry.getKey(), entry.getValue())); + } + return builder; + } + + + + private PulsarClient createPulsarClient() throws PulsarClientException { + return PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) + .authentication(BasicAuthentication.class.getName(), "authParam:client").build(); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java index 97279659af626..ed94a307cf7cf 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java @@ -27,7 +27,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Message; @@ -38,7 +37,6 @@ import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BinaryProtoLookupService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.slf4j.Logger; @@ -86,8 +84,7 @@ protected void setup() throws Exception { } private void startProxyService() throws Exception { - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))) { + proxyService = Mockito.spy(new ProxyService(proxyConfig) { @Override protected LookupProxyHandler newLookupProxyHandler(ProxyConnection proxyConnection) { return new TestLookupProxyHandler(this, proxyConnection); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 237cf5a48112c..f45d5ff97437e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -37,7 +37,6 @@ import org.apache.avro.reflect.Nullable; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -52,7 +51,6 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.ProtocolVersion; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; @@ -95,8 +93,7 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java index 64b0cd6b1a610..a7c0f10f72e3e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java @@ -26,14 +26,12 @@ import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; @@ -62,8 +60,7 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java index f77c0eeb2d41c..f5c58acda8aee 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTestWithAuth.java @@ -25,8 +25,6 @@ import java.util.Optional; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.mockito.Mockito; import org.testng.annotations.AfterClass; @@ -69,8 +67,7 @@ protected void setup() throws Exception { " \"audience\": \"https://dev-kt-aa9ne.us.auth0.com/api/v2/\"," + " \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}"); - proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore(); doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java index e8bb128c8c190..0c978dd1e9266 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java @@ -31,7 +31,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; @@ -42,7 +41,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -132,9 +130,7 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationProviders(providers); - AuthenticationService authenticationService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authenticationService)); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java index 31757cc036720..f76c7652b4708 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java @@ -33,7 +33,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -48,7 +47,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -226,10 +224,7 @@ protected void setup() throws Exception { properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(SECRET_KEY)); proxyConfig.setProperties(properties); - AuthenticationService authService = - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); - webServer = new WebServer(proxyConfig, authService); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); } @AfterMethod(alwaysRun = true) @@ -455,9 +450,7 @@ public void tlsCiphersAndProtocols(Set tlsCiphers, Set tlsProtoc proxyConfig.setTlsProtocols(tlsProtocols); proxyConfig.setTlsCiphers(tlsCiphers); - ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + ProxyService proxyService = Mockito.spy(new ProxyService(proxyConfig)); try { proxyService.start(); } catch (Exception ex) { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java index e912006faa022..2efd330879ebc 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java @@ -35,13 +35,11 @@ import javax.ws.rs.core.Response; import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; @@ -117,10 +115,8 @@ protected void setup() throws Exception { proxyConfig.setBrokerClientAuthenticationParameters(PROXY_TOKEN); proxyConfig.setAuthenticationProviders(providers); - AuthenticationService authService = - new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)); - proxyService = Mockito.spy(new ProxyService(proxyConfig, authService)); - webServer = new WebServer(proxyConfig, authService); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); + webServer = new WebServer(proxyService); } @AfterMethod(alwaysRun = true) @@ -411,10 +407,9 @@ void testGetMetrics() throws Exception { startProxy(); PulsarResources resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); - AuthenticationService authService = new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)); proxyConfig.setAuthenticateMetricsEndpoint(false); - WebServer webServer = new WebServer(proxyConfig, authService); + WebServer webServer = new WebServer(proxyConfig, + proxyService.getAuthenticationService(), proxyService.getAuthorizationService()); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); @@ -427,7 +422,8 @@ void testGetMetrics() throws Exception { webServer.stop(); } proxyConfig.setAuthenticateMetricsEndpoint(true); - webServer = new WebServer(proxyConfig, authService); + webServer = new WebServer(proxyConfig, + proxyService.getAuthenticationService(), proxyService.getAuthorizationService()); ProxyServiceStarter.addWebServerHandlers(webServer, proxyConfig, proxyService, new BrokerDiscoveryProvider(proxyConfig, resource)); webServer.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java index 9c8e2ba33c9e8..fd3a8a5305e05 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java @@ -30,7 +30,6 @@ import lombok.Cleanup; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; -import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; @@ -40,7 +39,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationTls; -import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.mockito.Mockito; @@ -119,9 +117,7 @@ protected void setup() throws Exception { proxyConfig.setAuthenticationProviders(providers); - proxyService = Mockito.spy(new ProxyService(proxyConfig, - new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig)))); + proxyService = Mockito.spy(new ProxyService(proxyConfig)); proxyService.start(); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java index d3291c8fb910d..21b9183861f07 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java @@ -26,9 +26,11 @@ import java.util.Optional; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -95,8 +97,9 @@ protected void setup() throws Exception { resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); - webServer = new WebServer(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))); + final ServiceConfiguration serviceConf = PulsarConfigurationLoader.convertFrom(proxyConfig); + webServer = new WebServer(proxyConfig, new AuthenticationService(serviceConf), + new AuthorizationService(serviceConf, resource)); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null); doReturn(report).when(discoveryProvider).nextBroker(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java index aa4aeaa2ea887..c63e2518ca441 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java @@ -31,8 +31,10 @@ import javax.ws.rs.client.WebTarget; import lombok.Cleanup; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -76,11 +78,11 @@ protected void setup() throws Exception { proxyConfig.setMetadataStoreUrl(DUMMY_VALUE); proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); - webServer = new WebServer(proxyConfig, new AuthenticationService( - PulsarConfigurationLoader.convertFrom(proxyConfig))); - resource = new PulsarResources(new ZKMetadataStore(mockZooKeeper), new ZKMetadataStore(mockZooKeeperGlobal)); + final ServiceConfiguration serviceConf = PulsarConfigurationLoader.convertFrom(proxyConfig); + webServer = new WebServer(proxyConfig, new AuthenticationService(serviceConf), + new AuthorizationService(serviceConf, resource)); discoveryProvider = spy(new BrokerDiscoveryProvider(proxyConfig, resource)); adminProxyHandler = new AdminProxyWrapper(proxyConfig, discoveryProvider); ServletHolder servletHolder = new ServletHolder(adminProxyHandler); @@ -88,7 +90,7 @@ protected void setup() throws Exception { webServer.addServlet("/lookup", servletHolder); webServer.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, proxyConfig.getStatusFilePath(), - VipStatus.class); + VipStatus.class, false, false); // start web-service webServer.start(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthentication.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthentication.java new file mode 100644 index 0000000000000..373079916ed0f --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthentication.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server.mocks; + +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.PulsarClientException; + +import java.io.IOException; +import java.util.Map; + +public class BasicAuthentication implements Authentication { + + private String authParam; + + @Override + public void close() throws IOException { + // noop + } + + @Override + public String getAuthMethodName() { + return "BasicAuthentication"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + try { + return new BasicAuthenticationData(authParam); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public void configure(Map authParams) { + this.authParam = authParams.get("authParam"); + } + + @Override + public void start() throws PulsarClientException { + // noop + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationData.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationData.java new file mode 100644 index 0000000000000..78d201d652ab0 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationData.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server.mocks; + +import org.apache.pulsar.client.api.AuthenticationDataProvider; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class BasicAuthenticationData implements AuthenticationDataProvider { + private final String authParam; + + public BasicAuthenticationData(String authParam) { + this.authParam = authParam; + } + + public boolean hasDataFromCommand() { + return true; + } + + public String getCommandData() { + return authParam; + } + + public boolean hasDataForHttp() { + return true; + } + + @Override + public Set> getHttpHeaders() { + Map headers = new HashMap<>(); + headers.put("BasicAuthentication", authParam); + return headers.entrySet(); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationProvider.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationProvider.java new file mode 100644 index 0000000000000..4991f49702382 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/mocks/BasicAuthenticationProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server.mocks; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; + +import javax.naming.AuthenticationException; +import java.io.IOException; + +public class BasicAuthenticationProvider implements AuthenticationProvider { + + @Override + public void close() throws IOException { + } + + @Override + public void initialize(ServiceConfiguration config) throws IOException { + } + + @Override + public String getAuthMethodName() { + return "BasicAuthentication"; + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + if (authData.hasDataFromCommand()) { + return authData.getCommandData(); + } else if (authData.hasDataFromHttp()) { + return authData.getHttpHeader("BasicAuthentication"); + } + + return null; + } +}