From 07cd25cd359e6f745d247f163f73abc628902e33 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Tue, 15 Nov 2022 23:35:14 +0800 Subject: [PATCH] fix --- .../pulsar/client/impl/MessageRedeliveryTest.java | 2 ++ .../org/apache/pulsar/client/impl/ConsumerBase.java | 4 ---- .../org/apache/pulsar/client/impl/ConsumerImpl.java | 11 ----------- 3 files changed, 2 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 91eddc94fb060c..29b06f68b64ebe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -429,12 +429,14 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce final String subName = "my-sub"; admin.topics().createPartitionedTopic(topic, 5); final int messageNumber = 50; + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 27b81f5c67c711..e01a7efecc5ec0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -945,10 +945,6 @@ protected final void notifyPendingBatchReceivedCallBack(CompletableFuture msg = incomingMessages.poll(); if (msg != null) { messageProcessed(msg); - if (!isValidConsumerEpoch((MessageImpl) msg)) { - msgPeeked = incomingMessages.peek(); - continue; - } Message interceptMsg = beforeConsume(msg); messages.add(interceptMsg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fd64464b3a1247..c55bc0aa367853 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -460,10 +460,6 @@ protected Message internalReceive() throws PulsarClientException { } } - boolean isValidConsumerEpoch(Message message) { - return isValidConsumerEpoch((MessageImpl) message); - } - @Override protected CompletableFuture> internalReceiveAsync() { CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); @@ -1921,13 +1917,6 @@ public void redeliverUnacknowledgedMessages() { } } - public int clearIncomingMessagesAndGetMessageNumber() { - int messagesNumber = incomingMessages.size(); - clearIncomingMessages(); - unAckedMessageTracker.clear(); - return messagesNumber; - } - @Override public void redeliverUnacknowledgedMessages(Set messageIds) { if (messageIds.isEmpty()) {