Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout. #17318

Merged
merged 12 commits into from
Oct 11, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -193,4 +194,42 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}

@Test
public void testBatchReceiveAckTimeout()
throws PulsarAdminException, PulsarClientException, InterruptedException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
shibd marked this conversation as resolved.
Show resolved Hide resolved
.topic(topicName)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
.topic(topicName)
.receiverQueueSize(numMessages)
.batchReceivePolicy(
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build()
).ackTimeout(1000, TimeUnit.MILLISECONDS)
.subscriptionName(methodName)
.subscribe();

producer.newMessage()
.value(1l)
.send();
Thread.sleep(300);
shibd marked this conversation as resolved.
Show resolved Hide resolved

// first batch receive
Assert.assertEquals(consumer.batchReceive().size(), 1);
// Not ack, trigger redelivery this message.
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -33,17 +32,17 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -261,7 +260,7 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
Expand All @@ -275,14 +274,9 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
producer.send(test1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
consumer.redeliverUnacknowledgedMessages();
Expand All @@ -309,21 +303,110 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

((ConsumerImpl<String>) consumer).grabCnx();
consumer.grabCnx();

message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
final String subName = "my-sub";
// set receive queue size is 4, and first send 4 messages,
// then call redeliver messages, assert receive msg num.
int receiveQueueSize = 4;
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.receiverQueueSize(receiveQueueSize)
.autoScaledReceiverQueueSizeEnabled(false)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

consumer.setConsumerEpoch(1);
for (int i = 0; i < receiveQueueSize; i++) {
producer.send("pulsar" + i);
}
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < receiveQueueSize; i++) {
Message<String> msg = consumer.receive();
assertEquals("pulsar" + i, msg.getValue());
}
}

@Test(dataProvider = "enableBatch")
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
shibd marked this conversation as resolved.
Show resolved Hide resolved
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
.subscriptionType(SubscriptionType.Failover)
.subscribe());

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();
shibd marked this conversation as resolved.
Show resolved Hide resolved

String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";

consumer.setConsumerEpoch(1);
producer.send(test1);

shibd marked this conversation as resolved.
Show resolved Hide resolved
Messages<String> messages;
Message<String> message;

messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
consumer.acknowledgeCumulativeAsync(message).get();
assertEquals(message.getValue(), test1);

consumer.setConsumerEpoch(3);
producer.send(test2);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test2);
consumer.acknowledgeCumulativeAsync(message).get();

consumer.setConsumerEpoch(6);
producer.send(test3);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test3);
}

@DataProvider(name = "enableBatch")
public static Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ protected void notifyPendingBatchReceivedCallBack() {

reentrantLock.lock();
try {
notifyPendingBatchReceivedCallBack(opBatchReceive);
notifyPendingBatchReceivedCallBack(opBatchReceive.future);
} finally {
reentrantLock.unlock();
}
Expand Down Expand Up @@ -941,7 +941,7 @@ private OpBatchReceive<T> nextBatchReceive() {
return opBatchReceive;
}

protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Expand All @@ -953,8 +953,7 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatc
}
msgPeeked = incomingMessages.peek();
}

completePendingBatchReceive(opBatchReceive.future, messages);
completePendingBatchReceive(batchReceiveFuture, messages);
}

protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
Expand Down Expand Up @@ -1182,7 +1181,7 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
Expand Down
Loading