Skip to content

Commit

Permalink
Fix unit test review code.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Sep 6, 2022
1 parent 1df6349 commit b4e7a52
Showing 1 changed file with 7 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
Expand All @@ -279,11 +279,6 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
Expand Down Expand Up @@ -313,14 +308,11 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{

Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

((ConsumerImpl<String>) consumer).grabCnx();
consumer.grabCnx();

message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
Expand All @@ -330,7 +322,7 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
Expand All @@ -347,11 +339,6 @@ public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Excep
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

Messages<String> messages;
Message<String> message;

Expand Down Expand Up @@ -381,16 +368,10 @@ public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Excep
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);

Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

((ConsumerImpl<String>) consumer).grabCnx();
consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
Expand Down

0 comments on commit b4e7a52

Please sign in to comment.