From 335312bb3bd71f70626a666cbdd013c7b4f2daf0 Mon Sep 17 00:00:00 2001 From: dengzhiwen1 Date: Wed, 26 Jun 2024 16:10:08 +0800 Subject: [PATCH] fix: ack msg which has reached maxReconsumeTimes --- .../ConsumeMessagePopConcurrentlyService.java | 2 +- .../impl/consumer/DefaultMQPushConsumerImpl.java | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java index 3713d1aba4d..d5191871106 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java @@ -471,7 +471,7 @@ public void run() { processQueue.decFoundMsg(-msgs.size()); } - log.warn("processQueue invalid. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}", + log.warn("processQueue invalid or popTimeout. isDropped={}, isPopTimeout={}, messageQueue={}, msgs={}", processQueue.isDropped(), isPopTimeout(), messageQueue, msgs); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 3e832e5a9a3..e66a9825f3d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -621,10 +621,9 @@ public void onException(Throwable e) { private PopResult processPopResult(final PopResult popResult, final SubscriptionData subscriptionData) { if (PopStatus.FOUND == popResult.getPopStatus()) { List msgFoundList = popResult.getMsgFoundList(); - List msgListFilterAgain = msgFoundList; + List msgListFilterAgain = new ArrayList<>(popResult.getMsgFoundList().size()); if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode() && popResult.getMsgFoundList().size() > 0) { - msgListFilterAgain = new ArrayList<>(popResult.getMsgFoundList().size()); for (MessageExt msg : popResult.getMsgFoundList()) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { @@ -632,6 +631,8 @@ private PopResult processPopResult(final PopResult popResult, final Subscription } } } + } else { + msgListFilterAgain.addAll(msgFoundList); } if (!this.filterMessageHookList.isEmpty()) { @@ -649,6 +650,15 @@ private PopResult processPopResult(final PopResult popResult, final Subscription } } + Iterator iterator = msgListFilterAgain.iterator(); + while (iterator.hasNext()) { + MessageExt msg = iterator.next(); + if (msg.getReconsumeTimes() > defaultMQPushConsumer.getMaxReconsumeTimes()) { + iterator.remove(); + log.info("Reconsume times has reached {}, so ack msg={}", msg.getReconsumeTimes(), msg); + } + } + if (msgFoundList.size() != msgListFilterAgain.size()) { for (MessageExt msg : msgFoundList) { if (!msgListFilterAgain.contains(msg)) {