Skip to content

Commit

Permalink
Revert "[fix][client] Messages with inconsistent consumer epochs are …
Browse files Browse the repository at this point in the history
…not filtered when using batch receive and trigger timeout. (apache#17318)"

This reverts commit 12e78b2.
  • Loading branch information
Technoboy- committed Nov 15, 2022
1 parent a69e29d commit b0668b2
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,4 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}

@Test
public void testBatchReceiveAckTimeout()
throws PulsarAdminException, PulsarClientException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

@Cleanup
Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
.topic(topicName)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
.topic(topicName)
.receiverQueueSize(numMessages)
.batchReceivePolicy(
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build()
).ackTimeout(1000, TimeUnit.MILLISECONDS)
.subscriptionName(methodName)
.subscribe();

producer.newMessage()
.value(1l)
.send();

// first batch receive
Assert.assertEquals(consumer.batchReceive().size(), 1);
// Not ack, trigger redelivery this message.
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -32,17 +33,17 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -225,15 +226,13 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
Expand Down Expand Up @@ -262,15 +261,12 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand All @@ -279,9 +275,14 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
producer.send(test1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
consumer.redeliverUnacknowledgedMessages();
Expand All @@ -308,113 +309,18 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();

message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
final String subName = "my-sub";
// set receive queue size is 4, and first send 4 messages,
// then call redeliver messages, assert receive msg num.
int receiveQueueSize = 4;

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.receiverQueueSize(receiveQueueSize)
.autoScaledReceiverQueueSizeEnabled(false)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

consumer.setConsumerEpoch(1);
for (int i = 0; i < receiveQueueSize; i++) {
producer.send("pulsar" + i);
}
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < receiveQueueSize; i++) {
Message<String> msg = consumer.receive();
assertEquals("pulsar" + i, msg.getValue());
}
}

@Test(dataProvider = "enableBatch")
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";

consumer.setConsumerEpoch(1);
producer.send(test1);

Messages<String> messages;
Message<String> message;

messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
consumer.acknowledgeCumulativeAsync(message).get();
assertEquals(message.getValue(), test1);

consumer.setConsumerEpoch(3);
producer.send(test2);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test2);
consumer.acknowledgeCumulativeAsync(message).get();
Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

consumer.setConsumerEpoch(6);
producer.send(test3);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
((ConsumerImpl<String>) consumer).grabCnx();
message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

Expand All @@ -423,21 +329,19 @@ public static Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}


@Test(dataProvider = "enableBatch")
public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testMultiConsumerRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand Down Expand Up @@ -478,66 +382,4 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
message = consumer.receive(5, TimeUnit.SECONDS);
assertNull(message);
}

@Test(dataProvider = "enableBatch", invocationCount = 10)
public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{

final String topic = "testMultiConsumerBatchRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build())
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

for (int i = 0; i < messageNumber; i++) {
producer.send("" + i);
}

int receiveNum = 0;
while (receiveNum < messageNumber) {
receiveNum += consumer.batchReceive().size();
}

// redeliverUnacknowledgedMessages once
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);

// redeliverUnacknowledgedMessages twice
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ protected void notifyPendingBatchReceivedCallBack() {
if (opBatchReceive == null) {
return;
}
notifyPendingBatchReceivedCallBack(opBatchReceive.future);
notifyPendingBatchReceivedCallBack(opBatchReceive);
}

private boolean hasNextBatchReceive() {
Expand All @@ -932,7 +932,7 @@ private OpBatchReceive<T> nextBatchReceive() {
return opBatchReceive;
}

protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Expand All @@ -944,7 +944,8 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messag
}
msgPeeked = incomingMessages.peek();
}
completePendingBatchReceive(batchReceiveFuture, messages);

completePendingBatchReceive(opBatchReceive.future, messages);
}

protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
Expand Down Expand Up @@ -1172,7 +1173,7 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
Expand Down
Loading

0 comments on commit b0668b2

Please sign in to comment.