Skip to content

Commit

Permalink
Messages with inconsistent consumer epochs are not filtered when usin…
Browse files Browse the repository at this point in the history
…g batch receive and trigger timeout
  • Loading branch information
Technoboy- committed Nov 15, 2022
1 parent 7975023 commit d237f50
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -192,4 +193,42 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}

@Test
public void testBatchReceiveAckTimeout()
throws PulsarAdminException, PulsarClientException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

@Cleanup
Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
.topic(topicName)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
.topic(topicName)
.receiverQueueSize(numMessages)
.batchReceivePolicy(
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build()
).ackTimeout(1000, TimeUnit.MILLISECONDS)
.subscriptionName(methodName)
.subscribe();

producer.newMessage()
.value(1l)
.send();

// first batch receive
Assert.assertEquals(consumer.batchReceive().size(), 1);
// Not ack, trigger redelivery this message.
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -33,17 +32,17 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -226,13 +225,15 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
Expand Down Expand Up @@ -261,12 +262,14 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand All @@ -275,14 +278,9 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
producer.send(test1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
consumer.redeliverUnacknowledgedMessages();
Expand All @@ -309,21 +307,116 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);
ConnectionHandler connectionHandler = consumer.getConnectionHandler();

connectionHandler.cnx().channel().close();

((ConsumerImpl<String>) consumer).grabCnx();
consumer.grabCnx();
message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
final String subName = "my-sub";
// set receive queue size is 4, and first send 4 messages,
// then call redeliver messages, assert receive msg num.
int receiveQueueSize = 4;

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.receiverQueueSize(receiveQueueSize)
.autoScaledReceiverQueueSizeEnabled(false)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

consumer.setConsumerEpoch(1);
for (int i = 0; i < receiveQueueSize; i++) {
producer.send("pulsar" + i);
}
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < receiveQueueSize; i++) {
Message<String> msg = consumer.receive();
assertEquals("pulsar" + i, msg.getValue());
}
}

@Test(dataProvider = "enableBatch")
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";

consumer.setConsumerEpoch(1);
producer.send(test1);

Messages<String> messages;
Message<String> message;

messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
consumer.acknowledgeCumulativeAsync(message).get();
assertEquals(message.getValue(), test1);

consumer.setConsumerEpoch(3);
producer.send(test2);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test2);
consumer.acknowledgeCumulativeAsync(message).get();

consumer.setConsumerEpoch(6);
producer.send(test3);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test3);
}

@DataProvider(name = "enableBatch")
public static Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
Expand Down Expand Up @@ -382,4 +475,66 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
message = consumer.receive(5, TimeUnit.SECONDS);
assertNull(message);
}

@Test(dataProvider = "enableBatch", invocationCount = 10)
public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{

final String topic = "testMultiConsumerBatchRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build())
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

for (int i = 0; i < messageNumber; i++) {
producer.send("" + i);
}

int receiveNum = 0;
while (receiveNum < messageNumber) {
receiveNum += consumer.batchReceive().size();
}

// redeliverUnacknowledgedMessages once
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);

// redeliverUnacknowledgedMessages twice
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}
}
Loading

0 comments on commit d237f50

Please sign in to comment.