diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 91eddc94fb060c..29b06f68b64ebe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -429,12 +429,14 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce final String subName = "my-sub"; admin.topics().createPartitionedTopic(topic, 5); final int messageNumber = 50; + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subName) .subscriptionType(SubscriptionType.Failover) .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(enableBatch) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fd64464b3a1247..c55bc0aa367853 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -460,10 +460,6 @@ protected Message internalReceive() throws PulsarClientException { } } - boolean isValidConsumerEpoch(Message message) { - return isValidConsumerEpoch((MessageImpl) message); - } - @Override protected CompletableFuture> internalReceiveAsync() { CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); @@ -1921,13 +1917,6 @@ public void redeliverUnacknowledgedMessages() { } } - public int clearIncomingMessagesAndGetMessageNumber() { - int messagesNumber = incomingMessages.size(); - clearIncomingMessages(); - unAckedMessageTracker.clear(); - return messagesNumber; - } - @Override public void redeliverUnacknowledgedMessages(Set messageIds) { if (messageIds.isEmpty()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java index 1405e279f8fe58..64248da921f966 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageRedeliveryTracker.java @@ -60,6 +60,7 @@ public UnAckedMessageRedeliveryTracker(PulsarClientImpl client, ConsumerBase @Override public void run(Timeout t) throws Exception { writeLock.lock(); + Set messageIds = null; try { HashSet headPartition = redeliveryTimePartitions.removeFirst(); if (!headPartition.isEmpty()) { @@ -71,9 +72,13 @@ public void run(Timeout t) throws Exception { } headPartition.clear(); redeliveryTimePartitions.addLast(headPartition); - triggerRedelivery(consumerBase); + messageIds = getRedeliveryMessages(consumerBase); } finally { writeLock.unlock(); + if (messageIds != null && !messageIds.isEmpty()) { + consumerBase.onAckTimeoutSend(messageIds); + consumerBase.redeliverUnacknowledgedMessages(messageIds); + } timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } } @@ -93,35 +98,29 @@ private void addAckTimeoutMessages(UnackMessageIdWrapper messageIdWrapper) { } } - private void triggerRedelivery(ConsumerBase consumerBase) { + private Set getRedeliveryMessages(ConsumerBase consumerBase) { if (ackTimeoutMessages.isEmpty()) { - return; + return null; } Set messageIds = TL_MESSAGE_IDS_SET.get(); messageIds.clear(); - try { - long now = System.currentTimeMillis(); - ackTimeoutMessages.forEach((messageId, timestamp) -> { - if (timestamp <= now) { - addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); - messageIds.add(messageId); - } - }); - if (!messageIds.isEmpty()) { - log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); - Iterator iterator = messageIds.iterator(); - while (iterator.hasNext()) { - MessageId messageId = iterator.next(); - ackTimeoutMessages.remove(messageId); - } + long now = System.currentTimeMillis(); + ackTimeoutMessages.forEach((messageId, timestamp) -> { + if (timestamp <= now) { + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); + messageIds.add(messageId); } - } finally { - if (messageIds.size() > 0) { - consumerBase.onAckTimeoutSend(messageIds); - consumerBase.redeliverUnacknowledgedMessages(messageIds); + }); + if (!messageIds.isEmpty()) { + log.info("[{}] {} messages will be re-delivered", consumerBase, messageIds.size()); + Iterator iterator = messageIds.iterator(); + while (iterator.hasNext()) { + MessageId messageId = iterator.next(); + ackTimeoutMessages.remove(messageId); } } + return messageIds; } @Override