From f9ea22fc4f52642f7b320f90343c265737b398cb Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Thu, 29 Aug 2024 15:20:06 +0800 Subject: [PATCH] =?UTF-8?q?fix:when=20isPopShouldStop=20hit=EF=BC=8C=20unl?= =?UTF-8?q?ock=20queueLockManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/broker/processor/PopMessageProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 6073023722a4..c7397f584505 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -543,6 +543,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; + future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); future.complete(restNum); return future; }