From ed51602eceafbef9a5421e7190941ea8332c6ce8 Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Thu, 29 Aug 2024 15:20:06 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:when=20isPopShouldStop=20hit=EF=BC=8C?= =?UTF-8?q?=20unlock=20queueLockManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/broker/processor/PopMessageProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 6073023722a..5df554a08cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -539,7 +539,8 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, future.complete(restNum); return future; } - + + future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum; @@ -548,7 +549,6 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, } try { - future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, From 07f82822816c878544523932e788f21112a6ab3e Mon Sep 17 00:00:00 2001 From: leizhiyuan Date: Thu, 29 Aug 2024 15:57:04 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:when=20isPopShouldStop=20hit=EF=BC=8C?= =?UTF-8?q?=20unlock=20queueLockManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/broker/processor/PopMessageProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 5df554a08cb..7cc2c9ff772 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -539,7 +539,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, future.complete(restNum); return future; } - + future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop poping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); From 484b3323fcd805e2df9ef0cbf665777f60896ef1 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Fri, 30 Aug 2024 09:50:41 +0800 Subject: [PATCH 3/3] fix: limit rate of appending commit in case of DLedger commit-log Signed-off-by: Zhanhui Li --- .../rocketmq/store/dledger/MessageStoreTestBase.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index a21806ffcf6..c4d9f0727b9 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store.dledger; +import com.google.common.util.concurrent.RateLimiter; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; @@ -122,7 +123,13 @@ protected DefaultMessageStore createMessageStore(String base, boolean createAbor } protected void doPutMessages(MessageStore messageStore, String topic, int queueId, int num, long beginLogicsOffset) throws UnknownHostException { + RateLimiter rateLimiter = RateLimiter.create(100); + MessageStoreConfig storeConfig = messageStore.getMessageStoreConfig(); + boolean limitAppendRate = storeConfig.isEnableDLegerCommitLog(); for (int i = 0; i < num; i++) { + if (limitAppendRate) { + rateLimiter.acquire(); + } MessageExtBrokerInner msgInner = buildMessage(); msgInner.setTopic(topic); msgInner.setQueueId(queueId);