Skip to content

Commit

Permalink
Fix: broken websocket test and proxy-producer error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Nov 1, 2016
1 parent 340dc8d commit ac4d658
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 86 deletions.
11 changes: 11 additions & 0 deletions docs/WebSocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,22 @@ http://{serviceUrl}:8080/ws/producer/persistent/{property}/{cluster}/{namespace}

##### Acknowledgement from server

###### Success response
```json
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
```
###### Failure response
```json
{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
```

| Key | Type | Requirement | Explanation |
|:------------|:------:|:-----------:|:--------------------------------------------:|
Expand Down Expand Up @@ -172,6 +181,8 @@ following error codes:
| 4 | Failed to serialize to JSON |
| 5 | Failed to authenticate client |
| 6 | Client is not authorized |
| 7 | Invalid payload encoding |
| 8 | Unknown error |

Application is responsible to re-establish a new WebSocket session after
a backoff period.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ protected Policies getNamespacePolicies(String property, String cluster, String
}
}

public ObjectMapper jsonMapper() {
public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,60 @@
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.test.PortManager;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

public class ProxyAuthenticationTest extends MockedPulsarServiceBaseTest {
public class ProxyAuthenticationTest extends ProducerConsumerBase {
protected String methodName;
private static final int TEST_PORT = PortManager.nextFreePort();
private static final int TEST_PORT = 6080;
private static final String CONSUME_URI = "ws://localhost:" + TEST_PORT
+ "/consume/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:" + TEST_PORT
+ "/produce/persistent/my-property/cluster1/my-ns/my-topic/";
+ "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private ProxyServer proxyServer;
private WebSocketService service;

@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(
Sets.newHashSet("com.yahoo.pulsar.websocket.proxy.MockAuthenticationProvider"));
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}

@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");

}
Expand All @@ -81,16 +91,20 @@ public void socketTest() throws InterruptedException {
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);

Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,58 @@
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

public class ProxyPublishConsumeTest extends MockedPulsarServiceBaseTest {
public class ProxyPublishConsumeTest extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/";
private static final String CONSUME_URI = "ws://localhost:6080/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "ws://localhost:6080/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final int TEST_PORT = 6080;
private ProxyServer proxyServer;
private WebSocketService service;

@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
config.setClusterName("use");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}

@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");

}

@Test
public void socketTest() throws InterruptedException {
public void socketTest() throws Exception {
URI consumeUri = URI.create(CONSUME_URI);
URI produceUri = URI.create(PRODUCE_URI);

Expand All @@ -73,19 +82,21 @@ public void socketTest() throws InterruptedException {
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);

Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);

Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
} finally {
try {
consumeClient.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,67 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.websocket.WebSocketService;
import com.yahoo.pulsar.websocket.service.ProxyServer;
import com.yahoo.pulsar.websocket.service.WebSocketServiceStarter;

import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

public class ProxyPublishConsumeTls extends MockedPulsarServiceBaseTest {
public class ProxyPublishConsumeTls extends ProducerConsumerBase {
protected String methodName;
private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/cluster1/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/cluster1/my-ns/my-topic/";
private static final String CONSUME_URI = "wss://localhost:6090/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
private static final String PRODUCE_URI = "wss://localhost:6090/ws/producer/persistent/my-property/use/my-ns/my-topic/";
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
private static final int TEST_PORT = 6080;
private static final int TLS_TEST_PORT = 6090;

private ProxyServer proxyServer;
private WebSocketService service;

@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

ServiceConfiguration config = new ServiceConfiguration();
config.setWebServicePort(TEST_PORT);
config.setWebServicePortTls(TLS_TEST_PORT);
config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);

config.setClusterName("use");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
service.start();

proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
}

@Override
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");

}
Expand All @@ -101,16 +108,20 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : %s%n", consumeUri);
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Thread.sleep(1000);
Assert.assertTrue(consumerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

consumeSocket.awaitClose(1, TimeUnit.SECONDS);
produceSocket.awaitClose(1, TimeUnit.SECONDS);

Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedExcepti

@OnWebSocketClose
public void onClose(int statusCode, String reason) {
log.info("Connection closed: %d - %s%n", statusCode, reason);
log.info("Connection closed: {} - {}", statusCode, reason);
this.session = null;
this.closeLatch.countDown();
}

@OnWebSocketConnect
public void onConnect(Session session) throws InterruptedException {
log.info("Got connect: %s%n", session);
log.info("Got connect: {}", session);
this.session = session;
log.debug("Got connected: {}", session);
}
Expand Down
Loading

0 comments on commit ac4d658

Please sign in to comment.