Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][client] Revert "[fix][client] Messages with inconsistent consumer epochs are not filtered when using batch receive and trigger timeout #18475

Merged
merged 2 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.awaitility.Awaitility;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
Expand Down Expand Up @@ -193,42 +192,4 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}

@Test
public void testBatchReceiveAckTimeout()
throws PulsarAdminException, PulsarClientException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

@Cleanup
Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
.topic(topicName)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
.topic(topicName)
.receiverQueueSize(numMessages)
.batchReceivePolicy(
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build()
).ackTimeout(1000, TimeUnit.MILLISECONDS)
.subscriptionName(methodName)
.subscribe();

producer.newMessage()
.value(1l)
.send();

// first batch receive
Assert.assertEquals(consumer.batchReceive().size(), 1);
// Not ack, trigger redelivery this message.
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -32,17 +33,17 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -225,15 +226,13 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
Expand Down Expand Up @@ -262,15 +261,12 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand All @@ -279,9 +275,14 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
producer.send(test1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
consumer.redeliverUnacknowledgedMessages();
Expand All @@ -308,113 +309,18 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();

message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
final String subName = "my-sub";
// set receive queue size is 4, and first send 4 messages,
// then call redeliver messages, assert receive msg num.
int receiveQueueSize = 4;

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.receiverQueueSize(receiveQueueSize)
.autoScaledReceiverQueueSizeEnabled(false)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

consumer.setConsumerEpoch(1);
for (int i = 0; i < receiveQueueSize; i++) {
producer.send("pulsar" + i);
}
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < receiveQueueSize; i++) {
Message<String> msg = consumer.receive();
assertEquals("pulsar" + i, msg.getValue());
}
}

@Test(dataProvider = "enableBatch")
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";

consumer.setConsumerEpoch(1);
producer.send(test1);

Messages<String> messages;
Message<String> message;

messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
consumer.acknowledgeCumulativeAsync(message).get();
assertEquals(message.getValue(), test1);

consumer.setConsumerEpoch(3);
producer.send(test2);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test2);
consumer.acknowledgeCumulativeAsync(message).get();
Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

consumer.setConsumerEpoch(6);
producer.send(test3);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
((ConsumerImpl<String>) consumer).grabCnx();
message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

Expand All @@ -423,21 +329,19 @@ public static Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}


@Test(dataProvider = "enableBatch")
public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testMultiConsumerRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand Down Expand Up @@ -478,66 +382,4 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
message = consumer.receive(5, TimeUnit.SECONDS);
assertNull(message);
}

@Test(dataProvider = "enableBatch", invocationCount = 10)
public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{

final String topic = "testMultiConsumerBatchRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build())
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

for (int i = 0; i < messageNumber; i++) {
producer.send("" + i);
}

int receiveNum = 0;
while (receiveNum < messageNumber) {
receiveNum += consumer.batchReceive().size();
}

// redeliverUnacknowledgedMessages once
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);

// redeliverUnacknowledgedMessages twice
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ protected void notifyPendingBatchReceivedCallBack() {
if (opBatchReceive == null) {
return;
}
notifyPendingBatchReceivedCallBack(opBatchReceive.future);
notifyPendingBatchReceivedCallBack(opBatchReceive);
}

private boolean hasNextBatchReceive() {
Expand All @@ -932,7 +932,7 @@ private OpBatchReceive<T> nextBatchReceive() {
return opBatchReceive;
}

protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Expand All @@ -944,7 +944,8 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messag
}
msgPeeked = incomingMessages.peek();
}
completePendingBatchReceive(batchReceiveFuture, messages);

completePendingBatchReceive(opBatchReceive.future, messages);
}

protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
Expand Down Expand Up @@ -1172,7 +1173,7 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
Expand Down
Loading