From 7c9a1fc8a7387a6a63fc7e5237967f8d1c65b1ef Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 8 Aug 2024 11:34:24 +0800 Subject: [PATCH 1/4] =?UTF-8?q?cq=E5=8F=8C=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/broker/BrokerController.java | 6 +- .../broker/offset/ConsumerOffsetManager.java | 20 +++ .../offset/RocksDBConsumerOffsetManager.java | 77 ++++++++- .../processor/AdminBrokerProcessor.java | 92 ++++++++++- .../RocksDBSubscriptionGroupManager.java | 36 ++-- .../SubscriptionGroupManager.java | 20 +++ .../topic/RocksDBTopicConfigManager.java | 26 +-- .../broker/topic/TopicConfigManager.java | 20 +++ .../RocksdbTransferOffsetAndCqTest.java | 154 ++++++++++++++++++ .../rocketmq/client/impl/MQClientAPIImpl.java | 15 ++ .../common/config/AbstractRocksDBStorage.java | 23 +-- .../remoting/protocol/RequestCode.java | 1 + ...eckRocksdbCqWriteProgressResponseBody.java | 35 ++++ ...ckRocksdbCqWriteProgressRequestHeader.java | 47 ++++++ .../rocketmq/store/DefaultMessageStore.java | 42 ++++- .../rocketmq/store/RocksDBMessageStore.java | 34 +++- .../store/config/MessageStoreConfig.java | 31 ++++ .../apache/rocketmq/store/queue/CqUnit.java | 1 + .../store/queue/RocksDBConsumeQueue.java | 3 +- .../store/queue/RocksDBConsumeQueueStore.java | 10 +- .../tools/admin/DefaultMQAdminExt.java | 7 + .../tools/admin/DefaultMQAdminExtImpl.java | 7 + .../rocketmq/tools/admin/MQAdminExt.java | 3 + .../ExportMetadataInRocksDBCommand.java | 4 +- .../CheckRocksdbCqWriteProgressCommand.java | 97 +++++++++++ 25 files changed, 745 insertions(+), 66 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 22ac7fedf1c..aaf06caddf8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -18,7 +18,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.ArrayList; @@ -789,6 +788,9 @@ public boolean initializeMessageStore() { defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); } else { defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable()); + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { + defaultMessageStore.enableRocksdbCQWrite(); + } } if (messageStoreConfig.isEnableDLegerCommitLog()) { @@ -812,7 +814,7 @@ public boolean initializeMessageStore() { this.timerMessageStore.registerEscapeBridgeHook(msg -> escapeBridge.putMessage(msg)); this.messageStore.setTimerMessageStore(this.timerMessageStore); } - } catch (IOException e) { + } catch (Exception e) { result = false; LOG.error("BrokerController#initialize: unexpected error occurs", e); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 21f20dde325..d3874d91d51 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; @@ -373,6 +374,25 @@ public void setDataVersion(DataVersion dataVersion) { this.dataVersion = dataVersion; } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); + if (obj != null) { + this.dataVersion = obj.dataVersion; + } + LOG.info("load consumer offset dataVersion success, " + fileName + " " + jsonString); + } + return true; + } catch (Exception e) { + LOG.error("load consumer offset dataVersion failed " + fileName, e); + return false; + } + } + public void removeOffset(final String group) { Iterator>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java index de293fc4992..1e7cda71eed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java @@ -16,26 +16,31 @@ */ package org.apache.rocketmq.broker.offset; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import java.io.File; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; - import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.WriteBatch; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; - public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager { + protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + protected RocksDBConfigManager rocksDBConfigManager; public RocksDBConsumerOffsetManager(BrokerController brokerController) { super(brokerController); - this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override @@ -43,9 +48,47 @@ public boolean load() { if (!rocksDBConfigManager.init()) { return false; } - return this.rocksDBConfigManager.loadData(this::decodeOffset); + if (!loadDataVersion() || !loadConsumerOffset()) { + return false; + } + + return true; + } + + public boolean loadConsumerOffset() { + return this.rocksDBConfigManager.loadData(this::decodeOffset) && merge(); + } + + private boolean merge() { + if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) { + log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed."); + return true; + } + if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { + log.info("consumerOffset json file does not exist, so skip merge"); + return true; + } + if (!super.loadDataVersion()) { + log.error("load json consumerOffset dataVersion error, startup will exit"); + return false; + } + + final DataVersion dataVersion = super.getDataVersion(); + final DataVersion kvDataVersion = this.getDataVersion(); + if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load json consumerOffset info failed, startup will exit"); + return false; + } + this.persist(); + this.getDataVersion().assignNewOne(dataVersion); + updateDataVersion(); + log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable())); + } + return true; } + @Override public boolean stop() { return this.rocksDBConfigManager.stop(); @@ -69,8 +112,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) { LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable()); } - @Override - public String configFilePath() { + public String rocksdbConfigFilePath() { return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator; } @@ -103,4 +145,23 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible); writeBatch.put(keyBytes, valueBytes); } + + @Override + public boolean loadDataVersion() { + return this.rocksDBConfigManager.loadDataVersion(); + } + + @Override + public DataVersion getDataVersion() { + return rocksDBConfigManager.getKvDataVersion(); + } + + public void updateDataVersion() { + try { + rocksDBConfigManager.updateKvDataVersion(); + } catch (Exception e) { + log.error("update consumer offset dataVersion error", e); + throw new RuntimeException(e); + } + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 28bd2549145..863f16e515e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -18,9 +18,11 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import io.opentelemetry.api.common.Attributes; import java.io.UnsupportedEncodingException; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; @@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import io.opentelemetry.api.common.Attributes; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; @@ -69,6 +70,7 @@ import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; @@ -137,6 +139,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader; @@ -209,6 +212,7 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.RocksDBMessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; @@ -217,8 +221,9 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.LibC; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -339,6 +344,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return fetchAllConsumeStatsInBroker(ctx, request); case RequestCode.QUERY_CONSUME_QUEUE: return queryConsumeQueue(ctx, request); + case RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS: + return this.checkRocksdbCqWriteProgress(ctx, request); case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN: return this.updateAndGetGroupForbidden(ctx, request); case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG: @@ -458,6 +465,71 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re return response; } + private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class); + String requestTopic = requestHeader.getTopic(); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setCode(ResponseCode.SUCCESS); + + DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore(); + RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore(); + if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"))); + return response; + } + + ConcurrentMap> cqTable = messageStore.getConsumeQueueTable(); + StringBuilder diffResult = new StringBuilder("check success, all is ok!\n"); + try { + if (StringUtils.isNotBlank(requestTopic)) { + processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); + return response; + } + for (Map.Entry> topicEntry : cqTable.entrySet()) { + String topic = topicEntry.getKey(); + processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult,true); + } + diffResult.append("check all topic successful, size:").append(cqTable.size()); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffResult.toString()))); + + } catch (Exception e) { + LOGGER.error("CheckRocksdbCqWriteProgressCommand error", e); + response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", e.getMessage()))); + } + return response; + } + + private void processConsumeQueuesForTopic(ConcurrentMap queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll) { + for (Map.Entry queueEntry : queueMap.entrySet()) { + Integer queueId = queueEntry.getKey(); + ConsumeQueueInterface jsonCq = queueEntry.getValue(); + ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId); + if (!checkAll) { + String format = String.format("\n[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ", + topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit()); + diffResult.append(format).append("\n"); + } + long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue(); + long minOffsetInQueue = kvCq.getMinOffsetInQueue(); + for (long i = minOffsetInQueue; i < maxFileOffsetInQueue; i++) { + Pair fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); + Pair kvCqUnit = kvCq.getCqUnitAndStoreTime(i); + if (fileCqUnit == null || kvCqUnit == null) { + diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n", + topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); + return; + } + if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) { + String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s", + topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1()); + LOGGER.error(diffInfo); + diffResult.append(diffInfo).append("\n"); + return; + } + } + } + } @Override public boolean rejectRequest() { return false; @@ -3305,4 +3377,20 @@ private boolean validateBlackListConfigExist(Properties properties) { } return false; } + + private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) { + if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) { + return false; + } + if (cqUnit1.getSize() != cqUnit2.getSize()) { + return false; + } + if (cqUnit1.getPos() != cqUnit2.getPos()) { + return false; + } + if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) { + return false; + } + return cqUnit1.getTagsCode() == cqUnit2.getTagsCode(); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java index 7df72dbe686..5119f78672c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java @@ -19,6 +19,12 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.UtilAll; @@ -27,13 +33,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.rocksdb.RocksIterator; -import java.io.File; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.BiConsumer; - public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { protected RocksDBConfigManager rocksDBConfigManager; @@ -79,28 +78,30 @@ public boolean loadForbidden(BiConsumer biConsumer) { private boolean merge() { if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("The switch is off, no merge operation is needed."); + log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed."); return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("json file and json back file not exist, so skip merge"); + log.info("subGroup json file does not exist, so skip merge"); return true; } - - if (!super.load()) { - log.error("load group and forbidden info from json file error, startup will exit"); + if (!super.loadDataVersion()) { + log.error("load json subGroup dataVersion error, startup will exit"); return false; } - - final ConcurrentMap groupTable = this.getSubscriptionGroupTable(); - final ConcurrentMap> forbiddenTable = this.getForbiddenTable(); final DataVersion dataVersion = super.getDataVersion(); final DataVersion kvDataVersion = this.getDataVersion(); if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load group and forbidden info from json file error, startup will exit"); + return false; + } + final ConcurrentMap groupTable = this.getSubscriptionGroupTable(); for (Map.Entry entry : groupTable.entrySet()) { putSubscriptionGroupConfig(entry.getValue()); log.info("import subscription config to rocksdb, group={}", entry.getValue()); } + final ConcurrentMap> forbiddenTable = this.getForbiddenTable(); for (Map.Entry> entry : forbiddenTable.entrySet()) { try { this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue())); @@ -110,8 +111,10 @@ private boolean merge() { return false; } } - this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + this.getDataVersion().assignNewOne(dataVersion); updateDataVersion(); + } else { + log.info("dataVersion is not greater than kvDataVersion, no need to merge group metaData, dataVersion={}, kvDataVersion={}", dataVersion, kvDataVersion); } log.info("finish marge subscription config from json file and merge to rocksdb"); this.persist(); @@ -196,6 +199,7 @@ public void updateDataVersion() { try { rocksDBConfigManager.updateKvDataVersion(); } catch (Exception e) { + log.error("update group config dataVersion error", e); throw new RuntimeException(e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index f2a7e0482b1..e6855ef9a2a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -334,6 +334,26 @@ public DataVersion getDataVersion() { return dataVersion; } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class); + if (obj != null) { + this.dataVersion.assignNewOne(obj.dataVersion); + this.printLoadDataWhenFirstBoot(obj); + log.info("load subGroup dataVersion success,{},{}", fileName, obj.dataVersion); + } + } + return true; + } catch (Exception e) { + log.error("load subGroup dataVersion failed" + fileName, e); + return false; + } + } + public void deleteSubscriptionGroupConfig(final String groupName) { SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName); this.forbiddenTable.remove(groupName); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java index 2a89dd7e024..466e6416f98 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java @@ -18,6 +18,9 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; import org.apache.rocketmq.common.TopicConfig; @@ -25,10 +28,6 @@ import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.protocol.DataVersion; -import java.io.File; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - public class RocksDBTopicConfigManager extends TopicConfigManager { protected RocksDBConfigManager rocksDBConfigManager; @@ -60,29 +59,35 @@ public boolean loadDataVersion() { private boolean merge() { if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) { - log.info("The switch is off, no merge operation is needed."); + log.info("the switch transferMetadataJsonToRocksdb is off, no merge topic operation is needed."); return true; } if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) { - log.info("json file and json back file not exist, so skip merge"); + log.info("topic json file does not exist, so skip merge"); return true; } - if (!super.load()) { - log.error("load topic config from json file error, startup will exit"); + if (!super.loadDataVersion()) { + log.error("load json topic dataVersion error, startup will exit"); return false; } - final ConcurrentMap topicConfigTable = this.getTopicConfigTable(); final DataVersion dataVersion = super.getDataVersion(); final DataVersion kvDataVersion = this.getDataVersion(); if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) { + if (!super.load()) { + log.error("load topic config from json file error, startup will exit"); + return false; + } + final ConcurrentMap topicConfigTable = this.getTopicConfigTable(); for (Map.Entry entry : topicConfigTable.entrySet()) { putTopicConfig(entry.getValue()); log.info("import topic config to rocksdb, topic={}", entry.getValue()); } - this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion); + this.getDataVersion().assignNewOne(dataVersion); updateDataVersion(); + } else { + log.info("dataVersion is not greater than kvDataVersion, no need to merge topic metaData, dataVersion={}, kvDataVersion={}", dataVersion, kvDataVersion); } log.info("finish read topic config from json file and merge to rocksdb"); this.persist(); @@ -150,6 +155,7 @@ public void updateDataVersion() { try { rocksDBConfigManager.updateKvDataVersion(); } catch (Exception e) { + log.error("update topic config dataVersion error", e); throw new RuntimeException(e); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index eab2896b001..25d3218f2ab 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -637,6 +637,26 @@ public String encode() { return encode(false); } + public boolean loadDataVersion() { + String fileName = null; + try { + fileName = this.configFilePath(); + String jsonString = MixAll.file2String(fileName); + if (jsonString != null) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = + TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class); + if (topicConfigSerializeWrapper != null) { + this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion()); + log.info("load topic metadata dataVersion success {}, {}", fileName, topicConfigSerializeWrapper.getDataVersion()); + } + } + return true; + } catch (Exception e) { + log.error("load topic metadata dataVersion failed" + fileName, e); + return false; + } + } + @Override public String configFilePath() { return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java new file mode 100644 index 00000000000..b4800aec24e --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.offset; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.Pair; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.RocksDBMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; +import org.apache.rocketmq.store.queue.CqUnit; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.rocksdb.RocksDBException; + +@RunWith(MockitoJUnitRunner.class) +public class RocksdbTransferOffsetAndCqTest { + + private final String basePath = Paths.get(System.getProperty("user.home"), + "unit-test-store", UUID.randomUUID().toString().substring(0, 16).toUpperCase()).toString(); + + private final String topic = "topic"; + private final String group = "group"; + private final String clientHost = "clientHost"; + private final int queueId = 1; + + private RocksDBConsumerOffsetManager rocksdbConsumerOffsetManager; + + private ConsumerOffsetManager consumerOffsetManager; + + private DefaultMessageStore defaultMessageStore; + + @Mock + private BrokerController brokerController; + + @Before + public void init() throws IOException { + if (notToBeExecuted()) { + return; + } + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setConsumerOffsetUpdateVersionStep(10); + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setStorePathRootDir(basePath); + messageStoreConfig.setTransferOffsetJsonToRocksdb(true); + messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); + Mockito.lenient().when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + Mockito.lenient().when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + + defaultMessageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("aaa", true), null, + brokerConfig, new ConcurrentHashMap()); + defaultMessageStore.enableRocksdbCQWrite(); + defaultMessageStore.loadCheckPoint(); + + consumerOffsetManager = new ConsumerOffsetManager(brokerController); + consumerOffsetManager.load(); + + rocksdbConsumerOffsetManager = new RocksDBConsumerOffsetManager(brokerController); + } + + @Test + public void testTransferOffset() { + if (notToBeExecuted()) { + return; + } + + for (int i = 0; i < 200; i++) { + consumerOffsetManager.commitOffset(clientHost, group, topic, queueId, i); + } + + ConcurrentMap> offsetTable = consumerOffsetManager.getOffsetTable(); + ConcurrentMap map = offsetTable.get(topic + "@" + group); + Assert.assertTrue(MapUtils.isNotEmpty(map)); + + Long offset = map.get(queueId); + Assert.assertEquals(199L, (long) offset); + + long offsetDataVersion = consumerOffsetManager.getDataVersion().getCounter().get(); + Assert.assertEquals(20L, offsetDataVersion); + + consumerOffsetManager.persist(); + + boolean loadResult = rocksdbConsumerOffsetManager.load(); + Assert.assertTrue(loadResult); + + ConcurrentMap> rocksdbOffsetTable = rocksdbConsumerOffsetManager.getOffsetTable(); + + ConcurrentMap rocksdbMap = rocksdbOffsetTable.get(topic + "@" + group); + Assert.assertTrue(MapUtils.isNotEmpty(rocksdbMap)); + + Long aLong1 = rocksdbMap.get(queueId); + Assert.assertEquals(199L, (long) aLong1); + + long rocksdbOffset = rocksdbConsumerOffsetManager.getDataVersion().getCounter().get(); + Assert.assertEquals(21L, rocksdbOffset); + } + + @Test + public void testRocksdbCqWrite() throws RocksDBException { + if (notToBeExecuted()) { + return; + } + RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore(); + ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore(); + ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId); + ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId); + for (int i = 0; i < 200; i++) { + DispatchRequest request = new DispatchRequest(topic, queueId, i, 200, 0, System.currentTimeMillis(), i, "", "", 0, 0, new HashMap<>()); + fileCq.putMessagePositionInfoWrapper(request); + store.putMessagePositionInfoWrapper(request); + } + Pair unit = rocksdbCq.getCqUnitAndStoreTime(100); + Pair unit1 = fileCq.getCqUnitAndStoreTime(100); + Assert.assertTrue(unit.getObject1().getPos() == unit1.getObject1().getPos()); + } + + private boolean notToBeExecuted() { + return MixAll.isMac(); + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b539b8f098a..0a45f096235 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -113,6 +113,7 @@ import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo; import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData; import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; @@ -148,6 +149,7 @@ import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader; @@ -3017,6 +3019,19 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, throw new MQClientException(response.getCode(), response.getRemark()); } + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(final String brokerAddr, final String topic, final long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + CheckRocksdbCqWriteProgressRequestHeader header = new CheckRocksdbCqWriteProgressRequestHeader(); + header.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, header); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + if (ResponseCode.SUCCESS == response.getCode()) { + return CheckRocksdbCqWriteProgressResponseBody.decode(response.getBody(), CheckRocksdbCqWriteProgressResponseBody.class); + } + throw new MQClientException(response.getCode(), response.getRemark()); + } + public void checkClientInBroker(final String brokerAddr, final String consumerGroup, final String clientId, final SubscriptionData subscriptionData, final long timeoutMillis) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index f88b8e198bf..13522889bb3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -17,6 +17,15 @@ package org.apache.rocketmq.common.config; import com.google.common.collect.Maps; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; @@ -40,16 +49,6 @@ import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import static org.rocksdb.RocksDB.NOT_FOUND; public abstract class AbstractRocksDBStorage { @@ -495,7 +494,9 @@ public void statRocksdb(Logger logger) { String blocksPinnedByIteratorMemUsage = this.db.getProperty("rocksdb.block-cache-pinned-usage"); logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, memtable: {}, blocksPinnedByIterator: {}", blockCacheMemUsage, indexesAndFilterBlockMemUsage, memTableMemUsage, blocksPinnedByIteratorMemUsage); - } catch (Exception ignored) { + } catch (Exception e) { + logger.error("statRocksdb Failed. {}", this.dbPath, e); + throw new RuntimeException(e); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index f45ff6fa484..cfc5cc22785 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -217,6 +217,7 @@ public class RequestCode { public static final int GET_SUBSCRIPTIONGROUP_CONFIG = 352; public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; + public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; public static final int LITE_PULL_MESSAGE = 361; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java new file mode 100644 index 00000000000..76719ac1a24 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CheckRocksdbCqWriteProgressResponseBody.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.body; + +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class CheckRocksdbCqWriteProgressResponseBody extends RemotingSerializable { + + String diffResult; + + public String getDiffResult() { + return diffResult; + } + + public void setDiffResult(String diffResult) { + this.diffResult = diffResult; + } + + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java new file mode 100644 index 00000000000..fee158b4976 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckRocksdbCqWriteProgressRequestHeader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; +import org.apache.rocketmq.common.resource.RocketMQResource; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; + +@RocketMQAction(value = RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, action = Action.GET) +public class CheckRocksdbCqWriteProgressRequestHeader implements CommandCustomHeader { + + @CFNotNull + @RocketMQResource(ResourceType.TOPIC) + private String topic; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 8f564d5bc14..8b46c7f5ce4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -163,11 +163,13 @@ public class DefaultMessageStore implements MessageStore { private volatile boolean shutdown = true; protected boolean notifyMessageArriveInBatch = false; - private StoreCheckpoint storeCheckpoint; + protected StoreCheckpoint storeCheckpoint; private TimerMessageStore timerMessageStore; private final LinkedList dispatcherList; + private RocksDBMessageStore rocksDBMessageStore; + private RandomAccessFile lockFile; private FileLock lock; @@ -354,12 +356,7 @@ public boolean load() { } if (result) { - this.storeCheckpoint = - new StoreCheckpoint( - StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); - this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset(); - setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset()); - + loadCheckPoint(); result = this.indexService.load(lastExitOK); this.recover(lastExitOK); LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset()); @@ -381,6 +378,14 @@ public boolean load() { return result; } + public void loadCheckPoint() throws IOException { + this.storeCheckpoint = + new StoreCheckpoint( + StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); + this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset(); + setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset()); + } + /** * @throws Exception */ @@ -511,6 +516,10 @@ public void shutdown() { this.compactionService.shutdown(); } + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { + this.rocksDBMessageStore.consumeQueueStore.shutdown(); + } + this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); @@ -3251,6 +3260,17 @@ public HARuntimeInfo getHARuntimeInfo() { } } + public void enableRocksdbCQWrite() { + try { + RocksDBMessageStore store = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, this.topicConfigTable); + this.rocksDBMessageStore = store; + store.loadAndStartConsumerServiceOnly(); + addDispatcher(store.getDispatcherBuildRocksdbConsumeQueue()); + } catch (Exception e) { + LOGGER.error("enableRocksdbCqWrite error", e); + } + } + public int getMaxDelayLevel() { return maxDelayLevel; } @@ -3338,4 +3358,12 @@ public boolean isTransientStorePoolEnable() { public long getReputFromOffset() { return this.reputMessageService.getReputFromOffset(); } + + public RocksDBMessageStore getRocksDBMessageStore() { + return this.rocksDBMessageStore; + } + + public ConsumeQueueStoreInterface getConsumeQueueStore() { + return consumeQueueStore; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index 6141b778bf7..edb68894853 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -16,13 +16,12 @@ */ package org.apache.rocketmq.store; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Supplier; - -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.Meter; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; @@ -39,6 +38,8 @@ public class RocksDBMessageStore extends DefaultMessageStore { + private CommitLogDispatcherBuildRocksdbConsumeQueue dispatcherBuildRocksdbConsumeQueue; + public RocksDBMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable) throws IOException { @@ -178,4 +179,31 @@ public void initMetrics(Meter meter, Supplier attributesBuild // Also add some metrics for rocksdb's monitoring. RocksDBStoreMetricsManager.init(meter, attributesBuilderSupplier, this); } + + public CommitLogDispatcherBuildRocksdbConsumeQueue getDispatcherBuildRocksdbConsumeQueue() { + return dispatcherBuildRocksdbConsumeQueue; + } + + class CommitLogDispatcherBuildRocksdbConsumeQueue implements CommitLogDispatcher { + @Override + public void dispatch(DispatchRequest request) throws RocksDBException { + putMessagePositionInfo(request); + } + } + + public void loadAndStartConsumerServiceOnly() { + try { + this.dispatcherBuildRocksdbConsumeQueue = new CommitLogDispatcherBuildRocksdbConsumeQueue(); + boolean loadResult = this.consumeQueueStore.load(); + if (!loadResult) { + throw new RuntimeException("load consume queue failed"); + } + super.loadCheckPoint(); + this.consumeQueueStore.start(); + } catch (Exception e) { + ERROR_LOG.error("loadAndStartConsumerServiceOnly error", e); + throw new RuntimeException(e); + } + } + } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 0b45d92418e..9c524aa5aea 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -424,6 +424,37 @@ public class MessageStoreConfig { private boolean putConsumeQueueDataByFileChannel = true; + private boolean transferOffsetJsonToRocksdb = false; + + private boolean rocksdbCQDoubleWriteEnable = false; + + private boolean enableBatchWriteKvCq = false; + + + public boolean isEnableBatchWriteKvCq() { + return enableBatchWriteKvCq; + } + + public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) { + this.enableBatchWriteKvCq = enableBatchWriteKvCq; + } + + public boolean isRocksdbCQDoubleWriteEnable() { + return rocksdbCQDoubleWriteEnable; + } + + public void setRocksdbCQDoubleWriteEnable(boolean rocksdbWriteEnable) { + this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable; + } + + public boolean isTransferOffsetJsonToRocksdb() { + return transferOffsetJsonToRocksdb; + } + + public void setTransferOffsetJsonToRocksdb(boolean transferOffsetJsonToRocksdb) { + this.transferOffsetJsonToRocksdb = transferOffsetJsonToRocksdb; + } + public boolean isEnabledAppendPropCRC() { return enabledAppendPropCRC; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java index b8865fd9195..34f5cb142b6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/CqUnit.java @@ -109,6 +109,7 @@ public String toString() { ", size=" + size + ", pos=" + pos + ", batchNum=" + batchNum + + ", tagsCode=" + tagsCode + ", compactedOffset=" + compactedOffset + '}'; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java index 5a981bb4df1..2363c2896e5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java @@ -18,7 +18,6 @@ import java.nio.ByteBuffer; import java.util.List; - import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.attribute.CQType; @@ -311,7 +310,7 @@ public CqUnit getEarliestUnit() { public CqUnit getLatestUnit() { try { long maxOffset = this.messageStore.getQueueStore().getMaxOffsetInQueue(topic, queueId); - return get(maxOffset); + return get(maxOffset > 0 ? maxOffset - 1 : maxOffset); } catch (RocksDBException e) { ERROR_LOG.error("getLatestUnit Failed. topic: {}, queueId: {}, {}", topic, queueId, e.getMessage()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 3c6b91ec018..34c6d2f3956 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -28,7 +28,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BoundaryType; @@ -78,6 +77,8 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { private final Map> tempTopicQueueMaxOffsetMap; private volatile boolean isCQError = false; + private boolean enableBatchWriteKvCq; + public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { super(messageStore); @@ -87,6 +88,7 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); this.writeBatch = new WriteBatch(); + this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq(); this.bufferDRList = new ArrayList(BATCH_SIZE); this.cqBBPairList = new ArrayList(BATCH_SIZE); this.offsetBBPairList = new ArrayList(BATCH_SIZE); @@ -164,12 +166,12 @@ private boolean shutdownInner() { @Override public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException { - if (request == null || this.bufferDRList.size() >= BATCH_SIZE) { - putMessagePosition(); - } if (request != null) { this.bufferDRList.add(request); } + if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) { + putMessagePosition(); + } } public void putMessagePosition() throws RocksDBException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 6ebee1d0dd1..3686bf2644b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -771,6 +772,12 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String ); } + @Override + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.defaultMQAdminExtImpl.checkRocksdbCqWriteProgress(brokerAddr, topic); + } + @Override public boolean resumeCheckHalfMessage(String topic, String msgId) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index dc4d35e7049..883dcbe41d7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -90,6 +90,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -1817,6 +1818,12 @@ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis); } + @Override + public CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { + return this.mqClientInstance.getMQClientAPIImpl().checkRocksdbCqWriteProgress(brokerAddr, topic, timeoutMillis); + } + @Override public boolean resumeCheckHalfMessage(final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index ff78f22c704..09204ab7be2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -48,6 +48,7 @@ import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList; import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -148,6 +149,8 @@ ConsumeStats examineConsumeStats( final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + CheckRocksdbCqWriteProgressResponseBody checkRocksdbCqWriteProgress(String brokerAddr, String topic) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; + ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java index 1ecb1fa2cd9..c466490b8a8 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataInRocksDBCommand.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.tools.command.export; import com.alibaba.fastjson.JSONObject; @@ -77,6 +78,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } String configType = commandLine.getOptionValue("configType").trim().toLowerCase(); + path += "/" + configType; boolean jsonEnable = false; if (commandLine.hasOption("jsonEnable")) { @@ -86,7 +88,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t ConfigRocksDBStorage kvStore = new ConfigRocksDBStorage(path, true /* readOnly */); if (!kvStore.start()) { - System.out.print("RocksDB load error, path=" + path + "\n"); + System.out.printf("RocksDB load error, path=%s\n" , path); return; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java new file mode 100644 index 00000000000..82dcb741962 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.queue; + +import java.util.Map; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.CheckRocksdbCqWriteProgressResponseBody; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +public class CheckRocksdbCqWriteProgressCommand implements SubCommand { + + @Override + public String commandName() { + return "checkRocksdbCqWriteProgressCommandCommand"; + } + + @Override + public String commandDesc() { + return "check if rocksdb cq is same as file cq"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "cluster", true, "cluster name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("n", "nameserverAddr", true, "nameserverAddr"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n'))); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : ""; + String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : ""; + + try { + defaultMQAdminExt.start(); + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); + if (clusterAddrTable.get(clusterName) == null) { + System.out.print("clusterAddrTable is empty"); + return; + } + for (Map.Entry entry : brokerAddrTable.entrySet()) { + String brokerName = entry.getKey(); + BrokerData brokerData = entry.getValue(); + String brokerAddr = brokerData.getBrokerAddrs().get(0L); + CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic); + if (StringUtils.isNotBlank(topic)) { + System.out.printf(body.getDiffResult()); + } else { + System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + } + } + + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} From c09c74a3525d81683922354f495c35d15db110c4 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Thu, 12 Sep 2024 14:46:35 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpr=E4=B8=AD=E6=8F=90?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/rocketmq/store/config/MessageStoreConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9c524aa5aea..c077831f3c4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -428,7 +428,7 @@ public class MessageStoreConfig { private boolean rocksdbCQDoubleWriteEnable = false; - private boolean enableBatchWriteKvCq = false; + private boolean enableBatchWriteKvCq = true; public boolean isEnableBatchWriteKvCq() { From e48d68d1f3777585e8c9ecd298f4c240a4255d39 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Mon, 9 Sep 2024 16:42:04 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpr=E4=B8=AD=E6=8F=90?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/store/RocksDBMessageStore.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java index edb68894853..90df7aed596 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager; @@ -187,7 +188,16 @@ public CommitLogDispatcherBuildRocksdbConsumeQueue getDispatcherBuildRocksdbCons class CommitLogDispatcherBuildRocksdbConsumeQueue implements CommitLogDispatcher { @Override public void dispatch(DispatchRequest request) throws RocksDBException { - putMessagePositionInfo(request); + final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); + switch (tranType) { + case MessageSysFlag.TRANSACTION_NOT_TYPE: + case MessageSysFlag.TRANSACTION_COMMIT_TYPE: + putMessagePositionInfo(request); + break; + case MessageSysFlag.TRANSACTION_PREPARED_TYPE: + case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: + break; + } } } From 7c87d9f7b7f9c4b73caba2370353b4aaaf77a5fa Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Fri, 20 Sep 2024 16:08:33 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dpr=E4=B8=AD=E6=8F=90?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/broker/offset/ConsumerOffsetManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index d3874d91d51..403324137cc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -384,7 +384,7 @@ public boolean loadDataVersion() { if (obj != null) { this.dataVersion = obj.dataVersion; } - LOG.info("load consumer offset dataVersion success, " + fileName + " " + jsonString); + LOG.info("load consumer offset dataVersion success,{},{} ", fileName, jsonString); } return true; } catch (Exception e) {