diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index 045fdc82e2cfd..2684f6661cafb 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -246,7 +246,7 @@ public void start() throws PulsarServerException { LOG.info("Starting Pulsar Broker service"); brokerService.start(); - this.webService = new WebService(config, this); + this.webService = new WebService(this); this.webService.addRestResources("/", "com.yahoo.pulsar.broker.web", false); this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true); this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java index 1c7a0fe7808a0..248afbcf10b10 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/PulsarWebResource.java @@ -73,7 +73,7 @@ public abstract class PulsarWebResource { protected PulsarService pulsar() { if (pulsar == null) { - pulsar = (PulsarService) servletContext.getAttribute("pulsar"); + pulsar = (PulsarService) servletContext.getAttribute(WebService.ATTRIBUTE_PULSAR_NAME); } return pulsar; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java index 93a9e5d558db0..2d81254137cee 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/web/WebService.java @@ -53,7 +53,6 @@ import com.google.common.collect.Lists; import com.yahoo.pulsar.broker.PulsarServerException; import com.yahoo.pulsar.broker.PulsarService; -import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.common.util.ObjectMapperFactory; import com.yahoo.pulsar.common.util.SecurityUtility; @@ -63,52 +62,61 @@ * Web Service embedded into Pulsar */ public class WebService implements AutoCloseable { - private final ServiceConfiguration config; - private final PulsarService pulsar; - private final Server server; - private final List handlers = Lists.newArrayList(); - private final ExecutorService webServiceExecutor; /** * The set of path regexes on which the ApiVersionFilter is installed if needed */ - private static final List API_VERSION_FILTER_PATTERNS = ImmutableList.of(Pattern.compile("^/lookup.*") // V2 - // lookups + private static final List API_VERSION_FILTER_PATTERNS = ImmutableList.of( + Pattern.compile("^/lookup.*") // V2 lookups ); + private static final String MATCH_ALL = "/*"; + + public static final String ATTRIBUTE_PULSAR_NAME = "pulsar"; + public static final String HANDLER_CACHE_CONTROL = "max-age=3600"; + public static final String HANDLER_REQUEST_LOG_TZ = "GMT"; + public static final int NUM_ACCEPTORS = 32; // make it configurable? + public static final int MAX_CONCURRENT_REQUESTES = 1024; // make it configurable? + + private final PulsarService pulsar; + private final Server server; + private final List handlers; + private final ExecutorService webServiceExecutor; - public WebService(ServiceConfiguration config, PulsarService pulsar) throws PulsarServerException { - this.config = config; + public WebService(PulsarService pulsar) throws PulsarServerException { + this.handlers = Lists.newArrayList(); this.pulsar = pulsar; - this.webServiceExecutor = Executors.newFixedThreadPool(32, new DefaultThreadFactory("pulsar-web")); + this.webServiceExecutor = Executors.newFixedThreadPool(WebService.NUM_ACCEPTORS, new DefaultThreadFactory("pulsar-web")); this.server = new Server(new ExecutorThreadPool(webServiceExecutor)); List connectors = new ArrayList<>(); ServerConnector connector = new PulsarServerConnector(server, 1, 1); - connector.setPort(config.getWebServicePort()); + connector.setPort(pulsar.getConfiguration().getWebServicePort()); connector.setHost(pulsar.getBindAddress()); connectors.add(connector); - if (config.isTlsEnabled()) { + if (pulsar.getConfiguration().isTlsEnabled()) { SslContextFactory sslCtxFactory = new SslContextFactory(); try { - SSLContext sslCtx = SecurityUtility.createSslContext(config.isTlsAllowInsecureConnection(), - config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(), - config.getTlsKeyFilePath()); - sslCtxFactory.setSslContext(sslCtx); + sslCtxFactory.setSslContext( + SecurityUtility.createSslContext( + pulsar.getConfiguration().isTlsAllowInsecureConnection(), + pulsar.getConfiguration().getTlsTrustCertsFilePath(), + pulsar.getConfiguration().getTlsCertificateFilePath(), + pulsar.getConfiguration().getTlsKeyFilePath())); } catch (GeneralSecurityException e) { throw new PulsarServerException(e); } sslCtxFactory.setWantClientAuth(true); ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory); - tlsConnector.setPort(config.getWebServicePortTls()); + tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls()); tlsConnector.setHost(pulsar.getBindAddress()); connectors.add(tlsConnector); } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size())); + connectors.forEach(c -> c.setAcceptQueueSize(WebService.MAX_CONCURRENT_REQUESTES / connectors.size())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } @@ -125,23 +133,23 @@ public void addRestResources(String basePath, String javaPackages, boolean requi public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) { ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(path); - context.addServlet(servletHolder, "/*"); - context.setAttribute("pulsar", pulsar); - context.setAttribute("config", config); + context.addServlet(servletHolder, MATCH_ALL); + context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar); - if (requiresAuthentication && config.isAuthenticationEnabled()) { + if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) { FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar)); - context.addFilter(filter, "/*", EnumSet.allOf(DispatcherType.class)); + context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); } log.info("Servlet path: '{}' -- Enable client version check: {} -- shouldCheckApiVersionOnPath: {}", path, - config.isClientLibraryVersionCheckEnabled(), shouldCheckApiVersionOnPath(path)); - if (config.isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) { + pulsar.getConfiguration().isClientLibraryVersionCheckEnabled(), + shouldCheckApiVersionOnPath(path)); + if (pulsar.getConfiguration().isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) { // Add the ApiVersionFilter to reject request from deprecated // clients. FilterHolder holder = new FilterHolder( - new ApiVersionFilter(pulsar, config.isClientLibraryVersionCheckAllowUnversioned())); - context.addFilter(holder, "/*", EnumSet.allOf(DispatcherType.class)); + new ApiVersionFilter(pulsar, pulsar.getConfiguration().isClientLibraryVersionCheckAllowUnversioned())); + context.addFilter(holder, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); log.info("Enabling ApiVersionFilter"); } @@ -154,7 +162,7 @@ public void addStaticResources(String basePath, String resourcePath) { ResourceHandler resHandler = new ResourceHandler(); resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); resHandler.setEtags(true); - resHandler.setCacheControl("max-age=3600"); + resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL); capHandler.setHandler(resHandler); handlers.add(capHandler); } @@ -181,7 +189,7 @@ public void start() throws PulsarServerException { RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLog requestLog = new Slf4jRequestLog(); requestLog.setExtended(true); - requestLog.setLogTimeZone("GMT"); + requestLog.setLogTimeZone(WebService.HANDLER_REQUEST_LOG_TZ); requestLog.setLogLatency(true); requestLogHandler.setRequestLog(requestLog); handlers.add(0, new ContextHandlerCollection());