Skip to content

Commit

Permalink
Pass brokerServiceUrl to websocket service configuration (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Jan 20, 2017
1 parent 8818eea commit 0569689
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 29 deletions.
8 changes: 8 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
# Global Zookeeper quorum connection string
globalZookeeperServers=

// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
serviceUrl=
serviceUrlTls=

# Port to use to server HTTP request
webServicePort=8080

Expand All @@ -39,6 +43,10 @@ authenticationProviders=
# Enforce authorization
authorizationEnabled=false

# Role names that are treated as "super-user", meaning they will be able to do all admin
# operations and publish/consume from all topics
superUserRoles=

# Authentication settings of the proxy itself. Used to connect to brokers
brokerClientAuthenticationPlugin=
brokerClientAuthenticationParameters=
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

public class ProxyAuthenticationTest extends ProducerConsumerBase {
Expand All @@ -56,7 +57,7 @@ public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setAuthenticationEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.authorization.AuthorizationManager;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
Expand All @@ -38,6 +37,7 @@
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.common.policies.data.PropertyAdmin;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;

public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
private WebSocketService service;
Expand All @@ -53,8 +53,10 @@ protected void setup() throws Exception {
conf.setClusterName("c1");
internalSetup();

ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Set<String> superUser = Sets.newHashSet("");
config.setAuthorizationEnabled(true);
config.setGlobalZookeeperServers("dummy-zk-servers");
config.setSuperUserRoles(superUser);
config.setClusterName("c1");
config.setWebServicePort(TEST_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

public class ProxyPublishConsumeTest extends ProducerConsumerBase {
Expand All @@ -51,9 +51,10 @@ public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand All @@ -64,13 +64,14 @@ public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setWebServicePortTls(TLS_TEST_PORT);
config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.websocket.proxy;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final int TEST_PORT = 6080;
private ProxyServer proxyServer;
private WebSocketService service;

@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setServiceUrl(pulsar.getWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}

@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
}

@Test
public void socketTest() throws Exception {
URI consumeUri = URI.create(CONSUME_URI);
URI produceUri = URI.create(PRODUCE_URI);

WebSocketClient consumeClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} finally {
try {
consumeClient.stop();
produceClient.stop();
} catch (Exception e) {
log.error(e.getMessage());
}
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeWithoutZKTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.websocket.DeploymentException;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,7 @@
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.policies.data.ClusterData;
import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
Expand Down Expand Up @@ -68,34 +70,41 @@ public class WebSocketService implements Closeable {

private ClusterData localCluster;

public WebSocketService(ServiceConfiguration config) throws PulsarClientException, MalformedURLException,
ServletException, DeploymentException, PulsarServerException {
this(null, config);
public WebSocketService(WebSocketProxyConfiguration config) {
this(createClusterData(config), createServiceConfiguration(config));
}

public WebSocketService(ClusterData localCluster, ServiceConfiguration config) throws PulsarClientException,
MalformedURLException, ServletException, DeploymentException, PulsarServerException {
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
this.localCluster = localCluster;
}

public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
DeploymentException {

this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
throw new PulsarServerException(e);
if (isNotBlank(config.getGlobalZookeeperServers())) {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
throw new PulsarServerException(e);
}
this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
log.info("Global Zookeeper cache started");
}

this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
log.info("Global Zookeeper cache started");
// start authorizationManager
if (config.isAuthorizationEnabled()) {
if (configurationCacheService == null) {
throw new PulsarServerException(
"Failed to initialize authorization manager due to empty GlobalZookeeperServers");
}
authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
authorizationManager = new AuthorizationManager(this.config, configurationCacheService);

log.info("Pulsar WebSocket Service started");
}

Expand Down Expand Up @@ -163,7 +172,37 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce
}
}

private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls());
} else {
return null;
}
}

private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) {
ServiceConfiguration serviceConfig = new ServiceConfiguration();
serviceConfig.setClusterName(config.getClusterName());
serviceConfig.setWebServicePort(config.getWebServicePort());
serviceConfig.setWebServicePortTls(config.getWebServicePortTls());
serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders());
serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
serviceConfig.setSuperUserRoles(config.getSuperUserRoles());
serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers());
serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
serviceConfig.setTlsEnabled(config.isTlsEnabled());
serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
return serviceConfig;
}

private ClusterData retrieveClusterData() throws PulsarServerException {
if (configurationCacheService == null) {
throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers");
}
try {
String path = "/admin/clusters/" + config.getClusterName();
return localCluster = configurationCacheService.clustersCache().get(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.util.SecurityUtility;

Expand All @@ -54,9 +53,9 @@ public class ProxyServer {
private final ExecutorService executorService;
private final Server server;
private final List<Handler> handlers = Lists.newArrayList();
private final ServiceConfiguration conf;
private final WebSocketProxyConfiguration conf;

public ProxyServer(ServiceConfiguration config)
public ProxyServer(WebSocketProxyConfiguration config)
throws PulsarClientException, MalformedURLException, PulsarServerException {
this.conf = config;
this.executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(),
Expand Down
Loading

0 comments on commit 0569689

Please sign in to comment.