Skip to content

Commit

Permalink
WebService refactoring (#40)
Browse files Browse the repository at this point in the history
* Little WebService refactor, removing magic values, removing unnecessary argument in the constructor.
  • Loading branch information
radekg authored and merlimat committed Sep 27, 2016
1 parent 577596e commit 0577fda
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void start() throws PulsarServerException {
LOG.info("Starting Pulsar Broker service");
brokerService.start();

this.webService = new WebService(config, this);
this.webService = new WebService(this);
this.webService.addRestResources("/", "com.yahoo.pulsar.broker.web", false);
this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class PulsarWebResource {

protected PulsarService pulsar() {
if (pulsar == null) {
pulsar = (PulsarService) servletContext.getAttribute("pulsar");
pulsar = (PulsarService) servletContext.getAttribute(WebService.ATTRIBUTE_PULSAR_NAME);
}

return pulsar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.SecurityUtility;

Expand All @@ -63,52 +62,61 @@
* Web Service embedded into Pulsar
*/
public class WebService implements AutoCloseable {
private final ServiceConfiguration config;
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final ExecutorService webServiceExecutor;

/**
* The set of path regexes on which the ApiVersionFilter is installed if needed
*/
private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(Pattern.compile("^/lookup.*") // V2
// lookups
private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(
Pattern.compile("^/lookup.*") // V2 lookups
);
private static final String MATCH_ALL = "/*";

public static final String ATTRIBUTE_PULSAR_NAME = "pulsar";
public static final String HANDLER_CACHE_CONTROL = "max-age=3600";
public static final String HANDLER_REQUEST_LOG_TZ = "GMT";
public static final int NUM_ACCEPTORS = 32; // make it configurable?
public static final int MAX_CONCURRENT_REQUESTES = 1024; // make it configurable?

private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final ExecutorService webServiceExecutor;

public WebService(ServiceConfiguration config, PulsarService pulsar) throws PulsarServerException {
this.config = config;
public WebService(PulsarService pulsar) throws PulsarServerException {
this.handlers = Lists.newArrayList();
this.pulsar = pulsar;
this.webServiceExecutor = Executors.newFixedThreadPool(32, new DefaultThreadFactory("pulsar-web"));
this.webServiceExecutor = Executors.newFixedThreadPool(WebService.NUM_ACCEPTORS, new DefaultThreadFactory("pulsar-web"));
this.server = new Server(new ExecutorThreadPool(webServiceExecutor));
List<ServerConnector> connectors = new ArrayList<>();

ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort());
connector.setPort(pulsar.getConfiguration().getWebServicePort());
connector.setHost(pulsar.getBindAddress());
connectors.add(connector);

if (config.isTlsEnabled()) {
if (pulsar.getConfiguration().isTlsEnabled()) {
SslContextFactory sslCtxFactory = new SslContextFactory();

try {
SSLContext sslCtx = SecurityUtility.createSslContext(config.isTlsAllowInsecureConnection(),
config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(),
config.getTlsKeyFilePath());
sslCtxFactory.setSslContext(sslCtx);
sslCtxFactory.setSslContext(
SecurityUtility.createSslContext(
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
pulsar.getConfiguration().getTlsTrustCertsFilePath(),
pulsar.getConfiguration().getTlsCertificateFilePath(),
pulsar.getConfiguration().getTlsKeyFilePath()));
} catch (GeneralSecurityException e) {
throw new PulsarServerException(e);
}

sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector);
}

// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size()));
connectors.forEach(c -> c.setAcceptQueueSize(WebService.MAX_CONCURRENT_REQUESTES / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
}

Expand All @@ -125,23 +133,23 @@ public void addRestResources(String basePath, String javaPackages, boolean requi
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, "/*");
context.setAttribute("pulsar", pulsar);
context.setAttribute("config", config);
context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar);

if (requiresAuthentication && config.isAuthenticationEnabled()) {
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar));
context.addFilter(filter, "/*", EnumSet.allOf(DispatcherType.class));
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

log.info("Servlet path: '{}' -- Enable client version check: {} -- shouldCheckApiVersionOnPath: {}", path,
config.isClientLibraryVersionCheckEnabled(), shouldCheckApiVersionOnPath(path));
if (config.isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) {
pulsar.getConfiguration().isClientLibraryVersionCheckEnabled(),
shouldCheckApiVersionOnPath(path));
if (pulsar.getConfiguration().isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) {
// Add the ApiVersionFilter to reject request from deprecated
// clients.
FilterHolder holder = new FilterHolder(
new ApiVersionFilter(pulsar, config.isClientLibraryVersionCheckAllowUnversioned()));
context.addFilter(holder, "/*", EnumSet.allOf(DispatcherType.class));
new ApiVersionFilter(pulsar, pulsar.getConfiguration().isClientLibraryVersionCheckAllowUnversioned()));
context.addFilter(holder, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
log.info("Enabling ApiVersionFilter");
}

Expand All @@ -154,7 +162,7 @@ public void addStaticResources(String basePath, String resourcePath) {
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true);
resHandler.setCacheControl("max-age=3600");
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
handlers.add(capHandler);
}
Expand All @@ -181,7 +189,7 @@ public void start() throws PulsarServerException {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogTimeZone(WebService.HANDLER_REQUEST_LOG_TZ);
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
Expand Down

0 comments on commit 0577fda

Please sign in to comment.