diff --git a/conf/websocket.conf b/conf/websocket.conf index 1dc70cd719fe5..4bca57a18e682 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -19,6 +19,10 @@ # Global Zookeeper quorum connection string globalZookeeperServers= +// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present) +serviceUrl= +serviceUrlTls= + # Port to use to server HTTP request webServicePort=8080 @@ -39,6 +43,10 @@ authenticationProviders= # Enforce authorization authorizationEnabled=false +# Role names that are treated as "super-user", meaning they will be able to do all admin +# operations and publish/consume from all topics +superUserRoles= + # Authentication settings of the proxy itself. Used to connect to brokers brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 6c970a970b304..18c5afcbb0340 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -39,6 +39,7 @@ import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; public class ProxyAuthenticationTest extends ProducerConsumerBase { @@ -56,7 +57,7 @@ public void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); - ServiceConfiguration config = new ServiceConfiguration(); + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); config.setWebServicePort(TEST_PORT); config.setClusterName("use"); config.setAuthenticationEnabled(true); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 2d3b09b855721..14057c88e9bcc 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -29,7 +29,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest; import com.yahoo.pulsar.broker.authorization.AuthorizationManager; import com.yahoo.pulsar.client.admin.PulsarAdminException; @@ -38,6 +37,7 @@ import com.yahoo.pulsar.common.policies.data.ClusterData; import com.yahoo.pulsar.common.policies.data.PropertyAdmin; import com.yahoo.pulsar.websocket.WebSocketService; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { private WebSocketService service; @@ -53,8 +53,10 @@ protected void setup() throws Exception { conf.setClusterName("c1"); internalSetup(); - ServiceConfiguration config = new ServiceConfiguration(); + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); Set superUser = Sets.newHashSet(""); + config.setAuthorizationEnabled(true); + config.setGlobalZookeeperServers("dummy-zk-servers"); config.setSuperUserRoles(superUser); config.setClusterName("c1"); config.setWebServicePort(TEST_PORT); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index f1dccbab58e63..1a5d74f887b24 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -32,10 +32,10 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; public class ProxyPublishConsumeTest extends ProducerConsumerBase { @@ -51,9 +51,10 @@ public void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); - ServiceConfiguration config = new ServiceConfiguration(); + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); config.setWebServicePort(TEST_PORT); config.setClusterName("use"); + config.setGlobalZookeeperServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java index 7c4d4c701f1fd..caf40ac894e59 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java @@ -40,10 +40,10 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.websocket.WebSocketService; import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -64,13 +64,14 @@ public void setup() throws Exception { super.internalSetup(); super.producerBaseSetup(); - ServiceConfiguration config = new ServiceConfiguration(); + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); config.setWebServicePort(TEST_PORT); config.setWebServicePortTls(TLS_TEST_PORT); config.setTlsEnabled(true); config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); config.setClusterName("use"); + config.setGlobalZookeeperServers("dummy-zk-servers"); service = spy(new WebSocketService(config)); doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); proxyServer = new ProxyServer(config); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java new file mode 100644 index 0000000000000..8f7e6f0c8ebf7 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -0,0 +1,113 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.websocket.proxy; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +import java.net.URI; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.yahoo.pulsar.client.api.ProducerConsumerBase; +import com.yahoo.pulsar.websocket.WebSocketService; +import com.yahoo.pulsar.websocket.service.ProxyServer; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; +import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter; + +public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase { + protected String methodName; + private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub"; + private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/"; + private static final int TEST_PORT = 6080; + private ProxyServer proxyServer; + private WebSocketService service; + + @BeforeClass + public void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); + config.setWebServicePort(TEST_PORT); + config.setClusterName("use"); + config.setServiceUrl(pulsar.getWebServiceAddress()); + config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); + service = spy(new WebSocketService(config)); + doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory(); + proxyServer = new ProxyServer(config); + WebSocketServiceStarter.start(proxyServer, service); + log.info("Proxy Server Started"); + } + + @AfterClass + protected void cleanup() throws Exception { + super.internalCleanup(); + service.close(); + proxyServer.stop(); + log.info("Finished Cleaning Up Test setup"); + } + + @Test + public void socketTest() throws Exception { + URI consumeUri = URI.create(CONSUME_URI); + URI produceUri = URI.create(PRODUCE_URI); + + WebSocketClient consumeClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + WebSocketClient produceClient = new WebSocketClient(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting to : {}", consumeUri); + + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + produceClient.start(); + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + // let it connect + Thread.sleep(1000); + Assert.assertTrue(consumerFuture.get().isOpen()); + Assert.assertTrue(producerFuture.get().isOpen()); + + consumeSocket.awaitClose(1, TimeUnit.SECONDS); + produceSocket.awaitClose(1, TimeUnit.SECONDS); + Assert.assertTrue(produceSocket.getBuffer().size() > 0); + Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); + } finally { + try { + consumeClient.stop(); + produceClient.stop(); + } catch (Exception e) { + log.error(e.getMessage()); + } + } + } + + private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeWithoutZKTest.class); +} diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketService.java index fb00fd78ccb13..d738f6e1a6dd1 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/WebSocketService.java @@ -26,6 +26,7 @@ import javax.websocket.DeploymentException; import org.apache.bookkeeper.util.OrderedSafeExecutor; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.common.policies.data.ClusterData; +import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration; import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache; import com.yahoo.pulsar.zookeeper.ZooKeeperCache; import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory; @@ -68,13 +70,11 @@ public class WebSocketService implements Closeable { private ClusterData localCluster; - public WebSocketService(ServiceConfiguration config) throws PulsarClientException, MalformedURLException, - ServletException, DeploymentException, PulsarServerException { - this(null, config); + public WebSocketService(WebSocketProxyConfiguration config) { + this(createClusterData(config), createServiceConfiguration(config)); } - public WebSocketService(ClusterData localCluster, ServiceConfiguration config) throws PulsarClientException, - MalformedURLException, ServletException, DeploymentException, PulsarServerException { + public WebSocketService(ClusterData localCluster, ServiceConfiguration config) { this.config = config; this.localCluster = localCluster; } @@ -82,20 +82,29 @@ public WebSocketService(ClusterData localCluster, ServiceConfiguration config) t public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException, DeploymentException { - this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), - (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), - this.orderedExecutor, this.executor); - try { - this.globalZkCache.start(); - } catch (IOException e) { - throw new PulsarServerException(e); + if (isNotBlank(config.getGlobalZookeeperServers())) { + this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), + (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), + this.orderedExecutor, this.executor); + try { + this.globalZkCache.start(); + } catch (IOException e) { + throw new PulsarServerException(e); + } + this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache()); + log.info("Global Zookeeper cache started"); } - this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache()); - log.info("Global Zookeeper cache started"); + // start authorizationManager + if (config.isAuthorizationEnabled()) { + if (configurationCacheService == null) { + throw new PulsarServerException( + "Failed to initialize authorization manager due to empty GlobalZookeeperServers"); + } + authorizationManager = new AuthorizationManager(this.config, configurationCacheService); + } + // start authentication service authenticationService = new AuthenticationService(this.config); - authorizationManager = new AuthorizationManager(this.config, configurationCacheService); - log.info("Pulsar WebSocket Service started"); } @@ -163,7 +172,37 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce } } + private static ClusterData createClusterData(WebSocketProxyConfiguration config) { + if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) { + return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls()); + } else { + return null; + } + } + + private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) { + ServiceConfiguration serviceConfig = new ServiceConfiguration(); + serviceConfig.setClusterName(config.getClusterName()); + serviceConfig.setWebServicePort(config.getWebServicePort()); + serviceConfig.setWebServicePortTls(config.getWebServicePortTls()); + serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled()); + serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders()); + serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin()); + serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters()); + serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled()); + serviceConfig.setSuperUserRoles(config.getSuperUserRoles()); + serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers()); + serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis()); + serviceConfig.setTlsEnabled(config.isTlsEnabled()); + serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath()); + serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath()); + return serviceConfig; + } + private ClusterData retrieveClusterData() throws PulsarServerException { + if (configurationCacheService == null) { + throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers"); + } try { String path = "/admin/clusters/" + config.getClusterName(); return localCluster = configurationCacheService.clustersCache().get(path) diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/ProxyServer.java index 44702278b2c65..b7a783ff6a6dc 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/ProxyServer.java @@ -44,7 +44,6 @@ import com.google.common.collect.Lists; import com.yahoo.pulsar.broker.PulsarServerException; -import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.common.util.SecurityUtility; @@ -54,9 +53,9 @@ public class ProxyServer { private final ExecutorService executorService; private final Server server; private final List handlers = Lists.newArrayList(); - private final ServiceConfiguration conf; + private final WebSocketProxyConfiguration conf; - public ProxyServer(ServiceConfiguration config) + public ProxyServer(WebSocketProxyConfiguration config) throws PulsarClientException, MalformedURLException, PulsarServerException { this.conf = config; this.executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketProxyConfiguration.java new file mode 100644 index 0000000000000..9d7ed01e052d5 --- /dev/null +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -0,0 +1,203 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.websocket.service; + +import java.util.Set; + +import com.google.common.collect.Sets; +import com.yahoo.pulsar.broker.FieldContext; + +public class WebSocketProxyConfiguration { + + // Name of the cluster to which this broker belongs to + @FieldContext(required = true) + private String clusterName; + + // Pulsar cluster url to connect to broker (optional if globalZookeeperServers present) + private String serviceUrl; + private String serviceUrlTls; + + // Global Zookeeper quorum connection string + private String globalZookeeperServers; + // Zookeeper session timeout in milliseconds + private long zooKeeperSessionTimeoutMillis = 30000; + + // Port to use to server HTTP request + private int webServicePort = 8080; + // Port to use to server HTTPS request + private int webServicePortTls = 8443; + // Hostname or IP address the service binds on, default is 0.0.0.0. + private String bindAddress; + // --- Authentication --- + // Enable authentication + private boolean authenticationEnabled; + // Autentication provider name list, which is a list of class names + private Set authenticationProviders = Sets.newTreeSet(); + // Enforce authorization + private boolean authorizationEnabled; + // Role names that are treated as "super-user", meaning they will be able to + // do all admin operations and publish/consume from all topics + private Set superUserRoles = Sets.newTreeSet(); + + // Authentication settings of the proxy itself. Used to connect to brokers + private String brokerClientAuthenticationPlugin; + private String brokerClientAuthenticationParameters; + + /***** --- TLS --- ****/ + // Enable TLS + private boolean tlsEnabled = false; + // Path for the TLS certificate file + private String tlsCertificateFilePath; + // Path for the TLS private key file + private String tlsKeyFilePath; + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getServiceUrl() { + return serviceUrl; + } + + public void setServiceUrl(String serviceUrl) { + this.serviceUrl = serviceUrl; + } + + public String getServiceUrlTls() { + return serviceUrlTls; + } + + public void setServiceUrlTls(String serviceUrlTls) { + this.serviceUrlTls = serviceUrlTls; + } + + public String getGlobalZookeeperServers() { + return globalZookeeperServers; + } + + public void setGlobalZookeeperServers(String globalZookeeperServers) { + this.globalZookeeperServers = globalZookeeperServers; + } + + public long getZooKeeperSessionTimeoutMillis() { + return zooKeeperSessionTimeoutMillis; + } + + public void setZooKeeperSessionTimeoutMillis(long zooKeeperSessionTimeoutMillis) { + this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis; + } + + public int getWebServicePort() { + return webServicePort; + } + + public void setWebServicePort(int webServicePort) { + this.webServicePort = webServicePort; + } + + public int getWebServicePortTls() { + return webServicePortTls; + } + + public void setWebServicePortTls(int webServicePortTls) { + this.webServicePortTls = webServicePortTls; + } + + public String getBindAddress() { + return bindAddress; + } + + public void setBindAddress(String bindAddress) { + this.bindAddress = bindAddress; + } + + public boolean isAuthenticationEnabled() { + return authenticationEnabled; + } + + public void setAuthenticationEnabled(boolean authenticationEnabled) { + this.authenticationEnabled = authenticationEnabled; + } + + public void setAuthenticationProviders(Set providersClassNames) { + authenticationProviders = providersClassNames; + } + + public Set getAuthenticationProviders() { + return authenticationProviders; + } + + public boolean isAuthorizationEnabled() { + return authorizationEnabled; + } + + public void setAuthorizationEnabled(boolean authorizationEnabled) { + this.authorizationEnabled = authorizationEnabled; + } + + public Set getSuperUserRoles() { + return superUserRoles; + } + + public void setSuperUserRoles(Set superUserRoles) { + this.superUserRoles = superUserRoles; + } + + public String getBrokerClientAuthenticationPlugin() { + return brokerClientAuthenticationPlugin; + } + + public void setBrokerClientAuthenticationPlugin(String brokerClientAuthenticationPlugin) { + this.brokerClientAuthenticationPlugin = brokerClientAuthenticationPlugin; + } + + public String getBrokerClientAuthenticationParameters() { + return brokerClientAuthenticationParameters; + } + + public void setBrokerClientAuthenticationParameters(String brokerClientAuthenticationParameters) { + this.brokerClientAuthenticationParameters = brokerClientAuthenticationParameters; + } + + public boolean isTlsEnabled() { + return tlsEnabled; + } + + public void setTlsEnabled(boolean tlsEnabled) { + this.tlsEnabled = tlsEnabled; + } + + public String getTlsCertificateFilePath() { + return tlsCertificateFilePath; + } + + public void setTlsCertificateFilePath(String tlsCertificateFilePath) { + this.tlsCertificateFilePath = tlsCertificateFilePath; + } + + public String getTlsKeyFilePath() { + return tlsKeyFilePath; + } + + public void setTlsKeyFilePath(String tlsKeyFilePath) { + this.tlsKeyFilePath = tlsKeyFilePath; + } + +} diff --git a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java index eeae4f8c6e80a..83fd1353b761a 100644 --- a/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/com/yahoo/pulsar/websocket/service/WebSocketServiceStarter.java @@ -16,12 +16,18 @@ package com.yahoo.pulsar.websocket.service; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.yahoo.pulsar.common.util.FieldParser.update; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.yahoo.pulsar.broker.ServiceConfiguration; -import com.yahoo.pulsar.broker.ServiceConfigurationLoader; import com.yahoo.pulsar.websocket.WebSocketConsumerServlet; import com.yahoo.pulsar.websocket.WebSocketProducerServlet; import com.yahoo.pulsar.websocket.WebSocketService; @@ -34,7 +40,7 @@ public static void main(String args[]) throws Exception { // load config file and start proxy service String configFile = args[0]; log.info("Loading configuration from {}", configFile); - ServiceConfiguration config = ServiceConfigurationLoader.create(configFile); + WebSocketProxyConfiguration config = load(configFile); ProxyServer proxyServer = new ProxyServer(config); WebSocketService service = new WebSocketService(config); start(proxyServer, service); @@ -50,6 +56,23 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.start(); service.start(); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static WebSocketProxyConfiguration load(String configFile) throws IOException, IllegalArgumentException { + final InputStream inStream = new FileInputStream(configFile); + try { + checkNotNull(inStream, "Unbable to read config file " + configFile); + WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); + Properties properties = new Properties(); + properties.load(inStream); + update((Map) properties, config); + return config; + } finally { + if (inStream != null) { + inStream.close(); + } + } + } private static final Logger log = LoggerFactory.getLogger(WebSocketServiceStarter.class);