Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebService refactoring #40

Merged
merged 6 commits into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void start() throws PulsarServerException {
LOG.info("Starting Pulsar Broker service");
brokerService.start();

this.webService = new WebService(config, this);
this.webService = new WebService(this);
this.webService.addRestResources("/", "com.yahoo.pulsar.broker.web", false);
this.webService.addRestResources("/admin", "com.yahoo.pulsar.broker.admin", true);
this.webService.addRestResources("/lookup", "com.yahoo.pulsar.broker.lookup", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class PulsarWebResource {

protected PulsarService pulsar() {
if (pulsar == null) {
pulsar = (PulsarService) servletContext.getAttribute("pulsar");
pulsar = (PulsarService) servletContext.getAttribute(WebService.ATTRIBUTE_PULSAR_NAME);
}

return pulsar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.common.util.SecurityUtility;

Expand All @@ -63,53 +62,64 @@
* Web Service embedded into Pulsar
*/
public class WebService implements AutoCloseable {
private final ServiceConfiguration config;
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final ExecutorService webServiceExecutor;

/**
* The set of path regexes on which the ApiVersionFilter is installed if needed
*/
private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(Pattern.compile("^/lookup.*") // V2
// lookups
private static final List<Pattern> API_VERSION_FILTER_PATTERNS = ImmutableList.of(
Pattern.compile("^/lookup.*") // V2 lookups
);
private static final String MATCH_ALL = "/*";

public static final String ATTRIBUTE_PULSAR_NAME = "pulsar";
public static final String HANDLER_CACHE_CONTROL = "max-age=3600";
public static final String HANDLER_REQUEST_LOG_TZ = "GMT";
public static final int NUM_ACCEPTORS = 32; // make it configurable?
public static final int MAX_CONCURRENT_REQUESTES = 1024; // make it configurable?

private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final ExecutorService webServiceExecutor;

public WebService(ServiceConfiguration config, PulsarService pulsar) throws PulsarServerException {
this.config = config;
public WebService(PulsarService pulsar) throws PulsarServerException {
this.handlers = Lists.newArrayList();
this.pulsar = pulsar;
this.webServiceExecutor = Executors.newFixedThreadPool(32, new DefaultThreadFactory("pulsar-web"));
this.webServiceExecutor = Executors.newFixedThreadPool(WebService.NUM_ACCEPTORS, new DefaultThreadFactory("pulsar-web"));
this.server = new Server(new ExecutorThreadPool(webServiceExecutor));
List<ServerConnector> connectors = new ArrayList<>();

ServerConnector connector = new PulsarServerConnector(server, 1, 1);
connector.setPort(config.getWebServicePort());
connector.setPort(pulsar.getConfiguration().getWebServicePort());
connector.setHost(pulsar.getBindAddress());
connectors.add(connector);

if (config.isTlsEnabled()) {
if (pulsar.getConfiguration().isTlsEnabled()) {
SslContextFactory sslCtxFactory = new SslContextFactory();

try {
SSLContext sslCtx = SecurityUtility.createSslContext(config.isTlsAllowInsecureConnection(),
config.getTlsTrustCertsFilePath(), config.getTlsCertificateFilePath(),
config.getTlsKeyFilePath());
sslCtxFactory.setSslContext(sslCtx);
sslCtxFactory.setSslContext(
SecurityUtility.createSslContext(
pulsar.getConfiguration().isTlsAllowInsecureConnection(),
pulsar.getConfiguration().getTlsTrustCertsFilePath(),
pulsar.getConfiguration().getTlsCertificateFilePath(),
pulsar.getConfiguration().getTlsKeyFilePath()));
} catch (GeneralSecurityException e) {
throw new PulsarServerException(e);
}

sslCtxFactory.setWantClientAuth(true);
ServerConnector tlsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
tlsConnector.setPort(config.getWebServicePortTls());
tlsConnector.setPort(pulsar.getConfiguration().getWebServicePortTls());
tlsConnector.setHost(pulsar.getBindAddress());
connectors.add(tlsConnector);
}

// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
connectors.forEach(c -> {
c.setAcceptQueueSize(WebService.MAX_CONCURRENT_REQUESTES / connectors.size());
server.addConnector(c);
});
}

public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication) {
Expand All @@ -125,23 +135,23 @@ public void addRestResources(String basePath, String javaPackages, boolean requi
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, "/*");
context.setAttribute("pulsar", pulsar);
context.setAttribute("config", config);
context.addServlet(servletHolder, MATCH_ALL);
context.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar);

if (requiresAuthentication && config.isAuthenticationEnabled()) {
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(pulsar));
context.addFilter(filter, "/*", EnumSet.allOf(DispatcherType.class));
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

log.info("Servlet path: '{}' -- Enable client version check: {} -- shouldCheckApiVersionOnPath: {}", path,
config.isClientLibraryVersionCheckEnabled(), shouldCheckApiVersionOnPath(path));
if (config.isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) {
pulsar.getConfiguration().isClientLibraryVersionCheckEnabled(),
shouldCheckApiVersionOnPath(path));
if (pulsar.getConfiguration().isClientLibraryVersionCheckEnabled() && shouldCheckApiVersionOnPath(path)) {
// Add the ApiVersionFilter to reject request from deprecated
// clients.
FilterHolder holder = new FilterHolder(
new ApiVersionFilter(pulsar, config.isClientLibraryVersionCheckAllowUnversioned()));
context.addFilter(holder, "/*", EnumSet.allOf(DispatcherType.class));
new ApiVersionFilter(pulsar, pulsar.getConfiguration().isClientLibraryVersionCheckAllowUnversioned()));
context.addFilter(holder, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
log.info("Enabling ApiVersionFilter");
}

Expand All @@ -154,7 +164,7 @@ public void addStaticResources(String basePath, String resourcePath) {
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true);
resHandler.setCacheControl("max-age=3600");
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
handlers.add(capHandler);
}
Expand All @@ -181,7 +191,7 @@ public void start() throws PulsarServerException {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone("GMT");
requestLog.setLogTimeZone(WebService.HANDLER_REQUEST_LOG_TZ);
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
Expand Down