Skip to content

Commit

Permalink
Moved spark receiver test to test-containers to avoid shading issues
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Jan 4, 2019
1 parent 97d264d commit bfb9bd7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 58 deletions.
27 changes: 7 additions & 20 deletions tests/pulsar-spark-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,19 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-spark</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>integration</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger-original</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,36 @@
*/
package org.apache.pulsar.spark;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doNothing;

import org.apache.pulsar.client.api.*;
import org.apache.spark.storage.StorageLevel;
import org.mockito.ArgumentCaptor;

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.spark.storage.StorageLevel;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;

public class SparkStreamingPulsarReceiverTest extends MockedPulsarServiceBaseTest {

private final String URL = "pulsar://127.0.0.1:" + BROKER_PORT + "/";
public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
private static final String TOPIC = "persistent://p1/c1/ns1/topic1";
private static final String SUBS = "sub1";
private static final String EXPECTED_MESSAGE = "pulsar-spark test message";

@BeforeClass
@Override
protected void setup() throws Exception {
internalSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
internalCleanup();
}

@Test
public void testReceivedMessage() throws Exception {
@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();

SparkStreamingPulsarReceiver receiver = spy(
new SparkStreamingPulsarReceiver(clientConf, consConf, URL, TOPIC, SUBS));
new SparkStreamingPulsarReceiver(clientConf, consConf, serviceUrl, TOPIC, SUBS));
MessageListener msgListener = spy(new MessageListener() {
@Override
public void received(Consumer consumer, Message msg) {
Expand All @@ -73,35 +61,38 @@ public void received(Consumer consumer, Message msg) {

receiver.onStart();
waitForTransmission();
PulsarClient pulsarClient = PulsarClient.create(URL, clientConf);
PulsarClient pulsarClient = PulsarClient.create(serviceUrl, clientConf);
Producer producer = pulsarClient.createProducer(TOPIC, new ProducerConfiguration());
producer.send(EXPECTED_MESSAGE.getBytes());
waitForTransmission();
receiver.onStop();
assertEquals(new String(msgCaptor.getValue().getData()), EXPECTED_MESSAGE);
}

@Test
public void testDefaultSettingsOfReceiver() {

@Test(dataProvider = "ServiceUrls")
public void testDefaultSettingsOfReceiver(String serviceUrl) throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
ConsumerConfiguration consConf = new ConsumerConfiguration();
SparkStreamingPulsarReceiver receiver =
new SparkStreamingPulsarReceiver(clientConf, consConf, URL, TOPIC, SUBS);
new SparkStreamingPulsarReceiver(clientConf, consConf, serviceUrl, TOPIC, SUBS);
assertEquals(receiver.storageLevel(), StorageLevel.MEMORY_AND_DISK_2());
assertEquals(consConf.getAckTimeoutMillis(), 60_000);
assertNotNull(consConf.getMessageListener());
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ClientConfiguration must not be null")
public void testReceiverWhenClientConfigurationIsNull() {
new SparkStreamingPulsarReceiver(null, new ConsumerConfiguration(), URL, TOPIC, SUBS);
expectedExceptionsMessageRegExp = "ClientConfiguration must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenClientConfigurationIsNull(String serviceUrl) {
new SparkStreamingPulsarReceiver(null, new ConsumerConfiguration(), serviceUrl, TOPIC, SUBS);
}

@Test(expectedExceptions = NullPointerException.class,
expectedExceptionsMessageRegExp = "ConsumerConfiguration must not be null")
public void testReceiverWhenConsumerConfigurationIsNull() {
new SparkStreamingPulsarReceiver(new ClientConfiguration(), null, URL, TOPIC, SUBS);
expectedExceptionsMessageRegExp = "ConsumerConfiguration must not be null",
dataProvider = "ServiceUrls")
public void testReceiverWhenConsumerConfigurationIsNull(String serviceUrl) {
new SparkStreamingPulsarReceiver(new ClientConfiguration(), null, serviceUrl, TOPIC, SUBS);
}

private static void waitForTransmission() {
Expand Down

0 comments on commit bfb9bd7

Please sign in to comment.