Skip to content

Commit

Permalink
Remove duplicate filter instances in Broker, Proxy and Function worke…
Browse files Browse the repository at this point in the history
…r web server

- There were filter instances for each context path which made
  maxConcurrentHttpRequests and httpRequestsMaxPerSecond not work as expected.
- Fixes the backpressure solution that is dependent on maxConcurrentHttpRequests and httpRequestsMaxPerSecond
  working properly
- Fix invalid Jersey api usage
  • Loading branch information
lhotari committed May 17, 2022
1 parent 08a7a3e commit e268d00
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 182 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.GracefulExecutorServicesShutdown;
import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
Expand Down Expand Up @@ -881,21 +883,21 @@ private void addWebServerHandlers(WebService webService,
// Ensure the VIP status is only visible when the broker is fully initialized
return state == State.Started;
});

// Add admin rest resources
webService.addRestResources("/",
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
webService.addRestResources("/",
"org.apache.pulsar.broker.web", false, attributeMap);
webService.addRestResource("/",
false, vipAttributeMap, VipStatus.class);
webService.addRestResources("/admin",
"org.apache.pulsar.broker.admin.v1", true, attributeMap);
true, attributeMap, "org.apache.pulsar.broker.admin.v1");
webService.addRestResources("/admin/v2",
"org.apache.pulsar.broker.admin.v2", true, attributeMap);
true, attributeMap, "org.apache.pulsar.broker.admin.v2");
webService.addRestResources("/admin/v3",
"org.apache.pulsar.broker.admin.v3", true, attributeMap);
webService.addRestResources("/lookup",
"org.apache.pulsar.broker.lookup", true, attributeMap);
webService.addRestResources("/topics",
"org.apache.pulsar.broker.rest", true, attributeMap);
true, attributeMap, "org.apache.pulsar.broker.admin.v3");
webService.addRestResource("/lookup",
true, attributeMap, TopicLookup.class,
org.apache.pulsar.broker.lookup.v2.TopicLookup.class);
webService.addRestResource("/topics",
true, attributeMap, Topics.class);

// Add metrics servlet
webService.addServlet("/metrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.pulsar.broker.web;

import static org.apache.pulsar.broker.web.Filters.addFilter;
import static org.apache.pulsar.broker.web.Filters.addFilterClass;
import com.google.common.collect.Lists;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.servlet.DispatcherType;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -43,6 +42,7 @@
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.QoSFilter;
Expand Down Expand Up @@ -72,6 +72,7 @@ public class WebService implements AutoCloseable {

private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
private final FilterInitializer filterInitializer;
private JettyStatisticsCollector jettyStatisticsCollector;

public WebService(PulsarService pulsar) throws PulsarServerException {
Expand Down Expand Up @@ -144,66 +145,106 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));

filterInitializer = new FilterInitializer(pulsar);
}

public void addRestResources(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
String... javaPackages) {
ResourceConfig config = new ResourceConfig();
for (String javaPackage : javaPackages) {
config.packages(false, javaPackage);
}
addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
}

public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
public void addRestResource(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
Class<?>... resourceClasses) {
ResourceConfig config = new ResourceConfig();
config.packages("jersey.config.server.provider.packages", javaPackages);
for (Class<?> resourceClass : resourceClasses) {
config.register(resourceClass);
}
addResourceServlet(basePath, requiresAuthentication, attributeMap, config);
}

private void addResourceServlet(String basePath, boolean requiresAuthentication, Map<String, Object> attributeMap,
ResourceConfig config) {
config.register(JsonMapperProvider.class);
config.register(MultiPartFeature.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
}

public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, MATCH_ALL);
if (attributeMap != null) {
attributeMap.forEach((key, value) -> {
context.setAttribute(key, value);
});
}
private static class FilterInitializer {
private final List<FilterHolder> filterHolders = new ArrayList<>();
private final FilterHolder authenticationFilterHolder;
FilterInitializer(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfiguration();
if (config.getMaxConcurrentHttpRequests() > 0) {
FilterHolder filterHolder = new FilterHolder(QoSFilter.class);
filterHolder.setInitParameter("maxRequests", String.valueOf(config.getMaxConcurrentHttpRequests()));
filterHolders.add(filterHolder);
}

ServiceConfiguration config = pulsar.getConfig();
if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
}

if (config.getMaxConcurrentHttpRequests() > 0) {
addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests",
String.valueOf(config.getMaxConcurrentHttpRequests())));
}
if (!config.getBrokerInterceptors().isEmpty()
|| !config.isDisableBrokerInterceptors()) {
ExceptionHandler handler = new ExceptionHandler();
// Enable PreInterceptFilter only when interceptors are enabled
filterHolders.add(
new FilterHolder(new PreInterceptFilter(pulsarService.getBrokerInterceptor(), handler)));
filterHolders.add(new FilterHolder(new ProcessHandlerFilter(pulsarService)));
}

if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
addFilter(context,
new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond()));
}
if (config.isAuthenticationEnabled()) {
authenticationFilterHolder = new FilterHolder(new AuthenticationFilter(
pulsarService.getBrokerService().getAuthenticationService()));
filterHolders.add(authenticationFilterHolder);
} else {
authenticationFilterHolder = null;
}

if (!config.getBrokerInterceptors().isEmpty()
|| !config.isDisableBrokerInterceptors()) {
ExceptionHandler handler = new ExceptionHandler();
// Enable PreInterceptFilter only when interceptors are enabled
addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler));
addFilter(context, new ProcessHandlerFilter(pulsar));
}
if (config.isDisableHttpDebugMethods()) {
filterHolders.add(new FilterHolder(new DisableDebugHttpMethodFilter(config)));
}

if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
addFilter(context, new AuthenticationFilter(
pulsar.getBrokerService().getAuthenticationService()));
}
if (config.getHttpMaxRequestSize() > 0) {
filterHolders.add(new FilterHolder(
new MaxRequestSizeFilter(
config.getHttpMaxRequestSize())));
}

if (config.isDisableHttpDebugMethods()) {
addFilter(context, new DisableDebugHttpMethodFilter(config));
filterHolders.add(new FilterHolder(new ResponseHandlerFilter(pulsarService)));
}

if (config.getHttpMaxRequestSize() > 0) {
addFilter(context,
new MaxRequestSizeFilter(
config.getHttpMaxRequestSize()));
public void addFilters(ServletContextHandler context, boolean requiresAuthentication) {
for (FilterHolder filterHolder : filterHolders) {
if (requiresAuthentication || filterHolder != authenticationFilterHolder) {
context.addFilter(filterHolder,
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
}
}

addFilter(context, new ResponseHandlerFilter(pulsar));
}

public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
// Notice: each context path should be unique, but there's nothing here to verify that
context.setContextPath(path);
context.addServlet(servletHolder, MATCH_ALL);
if (attributeMap != null) {
attributeMap.forEach((key, value) -> {
context.setAttribute(key, value);
});
}
filterInitializer.addFilters(context, requiresAuthentication);
handlers.add(context);
}

Expand Down
Loading

0 comments on commit e268d00

Please sign in to comment.