From 8bab7bc98da2fe565020297591be7418cbf3459c Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 15:47:18 +0800 Subject: [PATCH 01/13] =?UTF-8?q?=E5=8E=BB=E6=8E=89kvCQ=20batch=20write?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 34c6d2f3956..da11e46fd9e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { public static final byte CTRL_1 = '\u0001'; public static final byte CTRL_2 = '\u0002'; - private static final int BATCH_SIZE = 16; + private static final int BATCH_SIZE = 1; public static final int MAX_KEY_LEN = 300; private final ScheduledExecutorService scheduledExecutorService; @@ -168,8 +168,6 @@ private boolean shutdownInner() { public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException { if (request != null) { this.bufferDRList.add(request); - } - if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) { putMessagePosition(); } } From 7528ce76344cb174a3efed72b7d31da701243a3b Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 17:14:05 +0800 Subject: [PATCH 02/13] Remove kvCQ batch write --- .../rocketmq/store/config/MessageStoreConfig.java | 10 +++++----- .../store/queue/RocksDBConsumeQueueStore.java | 11 ++++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index c077831f3c4..68531284389 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -428,15 +428,15 @@ public class MessageStoreConfig { private boolean rocksdbCQDoubleWriteEnable = false; - private boolean enableBatchWriteKvCq = true; + private int batchWriteKvCqSize = 16; - public boolean isEnableBatchWriteKvCq() { - return enableBatchWriteKvCq; + public int getBatchWriteKvCqSize() { + return batchWriteKvCqSize; } - public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) { - this.enableBatchWriteKvCq = enableBatchWriteKvCq; + public void setBatchWriteKvCqSize(int batchWriteKvCqSize) { + this.batchWriteKvCqSize = batchWriteKvCqSize; } public boolean isRocksdbCQDoubleWriteEnable() { diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index da11e46fd9e..68ea38cee45 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { public static final byte CTRL_1 = '\u0001'; public static final byte CTRL_2 = '\u0002'; - private static final int BATCH_SIZE = 1; + private int BATCH_SIZE = 16; public static final int MAX_KEY_LEN = 300; private final ScheduledExecutorService scheduledExecutorService; @@ -77,8 +77,6 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { private final Map> tempTopicQueueMaxOffsetMap; private volatile boolean isCQError = false; - private boolean enableBatchWriteKvCq; - public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { super(messageStore); @@ -88,7 +86,7 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); this.writeBatch = new WriteBatch(); - this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq(); + this.BATCH_SIZE = messageStoreConfig.getBatchWriteKvCqSize(); this.bufferDRList = new ArrayList(BATCH_SIZE); this.cqBBPairList = new ArrayList(BATCH_SIZE); this.offsetBBPairList = new ArrayList(BATCH_SIZE); @@ -166,9 +164,12 @@ private boolean shutdownInner() { @Override public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException { + if (request == null || this.bufferDRList.size() >= BATCH_SIZE) { + putMessagePosition(); + } + if (request != null) { this.bufferDRList.add(request); - putMessagePosition(); } } From 23f14773f2239c66bf47520fccfc2a65b97c229a Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 17:15:13 +0800 Subject: [PATCH 03/13] Remove kvCQ batch write --- .../apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 68ea38cee45..b4d1f972b32 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { public static final byte CTRL_1 = '\u0001'; public static final byte CTRL_2 = '\u0002'; - private int BATCH_SIZE = 16; + private final int BATCH_SIZE; public static final int MAX_KEY_LEN = 300; private final ScheduledExecutorService scheduledExecutorService; @@ -167,7 +167,6 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksD if (request == null || this.bufferDRList.size() >= BATCH_SIZE) { putMessagePosition(); } - if (request != null) { this.bufferDRList.add(request); } From d4655cbbdbd94bc33f357b331739f4aa7d8cf2d5 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 17:21:35 +0800 Subject: [PATCH 04/13] Remove kvCQ batch write --- .../store/queue/RocksDBConsumeQueueStore.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index b4d1f972b32..08c7264d886 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore { public static final byte CTRL_1 = '\u0001'; public static final byte CTRL_2 = '\u0002'; - private final int BATCH_SIZE; + private final int batchSize; public static final int MAX_KEY_LEN = 300; private final ScheduledExecutorService scheduledExecutorService; @@ -86,11 +86,11 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); this.writeBatch = new WriteBatch(); - this.BATCH_SIZE = messageStoreConfig.getBatchWriteKvCqSize(); - this.bufferDRList = new ArrayList(BATCH_SIZE); - this.cqBBPairList = new ArrayList(BATCH_SIZE); - this.offsetBBPairList = new ArrayList(BATCH_SIZE); - for (int i = 0; i < BATCH_SIZE; i++) { + this.batchSize = messageStoreConfig.getBatchWriteKvCqSize(); + this.bufferDRList = new ArrayList(batchSize); + this.cqBBPairList = new ArrayList(batchSize); + this.offsetBBPairList = new ArrayList(batchSize); + for (int i = 0; i < batchSize; i++) { this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair()); this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair()); } @@ -164,7 +164,7 @@ private boolean shutdownInner() { @Override public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException { - if (request == null || this.bufferDRList.size() >= BATCH_SIZE) { + if (request == null || this.bufferDRList.size() >= batchSize) { putMessagePosition(); } if (request != null) { From c541a7bf1c5e77de79a14431b929edc85a85e3d9 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 17:58:18 +0800 Subject: [PATCH 05/13] Remove kvCQ batch write --- .../apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 08c7264d886..c889ae7ca85 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -167,6 +167,7 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksD if (request == null || this.bufferDRList.size() >= batchSize) { putMessagePosition(); } + if (request != null) { this.bufferDRList.add(request); } From 9f2713556055fbd6e1241b25847d350f9d8d8992 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Tue, 24 Sep 2024 20:34:47 +0800 Subject: [PATCH 06/13] Remove kvCQ batch write --- .../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 43e4259c4e1..313a777ce4f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -104,6 +104,7 @@ import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand; import org.apache.rocketmq.tools.command.producer.ProducerSubCommand; +import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand; import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; @@ -304,6 +305,7 @@ public static void initCommand() { initCommand(new ListAclSubCommand()); initCommand(new CopyAclsSubCommand()); initCommand(new RocksDBConfigToJsonCommand()); + initCommand(new CheckRocksdbCqWriteProgressCommand()); } private static void printHelp() { From 2a2d619d9edc86a1200bff274fef7b1673d56b9d Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Wed, 25 Sep 2024 11:19:32 +0800 Subject: [PATCH 07/13] Remove kvCQ batch write --- .../tools/command/queue/CheckRocksdbCqWriteProgressCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index 82dcb741962..48dd2e4920a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -34,7 +34,7 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand { @Override public String commandName() { - return "checkRocksdbCqWriteProgressCommandCommand"; + return "checkRocksdbCqWriteProgress"; } @Override From 81e74b553b433d5774b444645b7f001cc8828cc3 Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Wed, 25 Sep 2024 11:32:53 +0800 Subject: [PATCH 08/13] Remove kvCQ batch write --- .../broker/processor/AdminBrokerProcessor.java | 16 +++++++++++----- .../store/plugin/AbstractPluginMessageStore.java | 4 ++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 863f16e515e..49528e8cf0f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -215,6 +215,7 @@ import org.apache.rocketmq.store.RocksDBMessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.ReferredIterator; @@ -470,15 +471,20 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R String requestTopic = requestHeader.getTopic(); final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setCode(ResponseCode.SUCCESS); - - DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore(); - RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore(); - if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { + MessageStore messageStore = brokerController.getMessageStore(); + DefaultMessageStore defaultMessageStore; + if (messageStore instanceof AbstractPluginMessageStore){ + defaultMessageStore = (DefaultMessageStore)((AbstractPluginMessageStore) messageStore).getNext(); + }else { + defaultMessageStore = (DefaultMessageStore)messageStore; + } + RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); + if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid"))); return response; } - ConcurrentMap> cqTable = messageStore.getConsumeQueueTable(); + ConcurrentMap> cqTable = defaultMessageStore.getConsumeQueueTable(); StringBuilder diffResult = new StringBuilder("check success, all is ok!\n"); try { if (StringUtils.isNotBlank(requestTopic)) { diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java index 2f2ce981257..2401257c306 100644 --- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java @@ -661,4 +661,8 @@ public void recoverTopicQueueTable() { public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) { next.notifyMessageArriveIfNecessary(dispatchRequest); } + + public MessageStore getNext() { + return next; + } } From b9e79d9cdaa2eafba3d499cb796025ad668f588f Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Wed, 25 Sep 2024 11:38:16 +0800 Subject: [PATCH 09/13] Remove kvCQ batch write --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 49528e8cf0f..1407764bf29 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -473,10 +473,10 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R response.setCode(ResponseCode.SUCCESS); MessageStore messageStore = brokerController.getMessageStore(); DefaultMessageStore defaultMessageStore; - if (messageStore instanceof AbstractPluginMessageStore){ - defaultMessageStore = (DefaultMessageStore)((AbstractPluginMessageStore) messageStore).getNext(); - }else { - defaultMessageStore = (DefaultMessageStore)messageStore; + if (messageStore instanceof AbstractPluginMessageStore) { + defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext(); + } else { + defaultMessageStore = (DefaultMessageStore) messageStore; } RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore(); if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) { From 1e0c8273a406ea35ea423d92df0507f20d8bbc4e Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Wed, 25 Sep 2024 12:02:50 +0800 Subject: [PATCH 10/13] Remove kvCQ batch write --- .../command/queue/CheckRocksdbCqWriteProgressCommand.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index 48dd2e4920a..4e83d38f658 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -82,9 +82,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { String brokerAddr = brokerData.getBrokerAddrs().get(0L); CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic); if (StringUtils.isNotBlank(topic)) { - System.out.printf(body.getDiffResult()); + System.out.print(body.getDiffResult()); } else { - System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + System.out.print(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); } } From 40b98b2ec5e8c8b1230f5d3a5739c383d30ef46d Mon Sep 17 00:00:00 2001 From: LetLetMe Date: Wed, 25 Sep 2024 14:22:52 +0800 Subject: [PATCH 11/13] Remove kvCQ batch write --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1407764bf29..e7d79358cc5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -485,7 +485,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R } ConcurrentMap> cqTable = defaultMessageStore.getConsumeQueueTable(); - StringBuilder diffResult = new StringBuilder("check success, all is ok!\n"); + StringBuilder diffResult = new StringBuilder(); try { if (StringUtils.isNotBlank(requestTopic)) { processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false); @@ -527,10 +527,10 @@ private void processConsumeQueuesForTopic(ConcurrentMap Date: Wed, 25 Sep 2024 15:02:24 +0800 Subject: [PATCH 12/13] Remove kvCQ batch write --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index e7d79358cc5..866517ff992 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -527,7 +527,7 @@ private void processConsumeQueuesForTopic(ConcurrentMap Date: Wed, 25 Sep 2024 15:49:40 +0800 Subject: [PATCH 13/13] Remove kvCQ batch write --- .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 2 +- .../tools/command/queue/CheckRocksdbCqWriteProgressCommand.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 866517ff992..80f3f44facb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -522,7 +522,7 @@ private void processConsumeQueuesForTopic(ConcurrentMap fileCqUnit = jsonCq.getCqUnitAndStoreTime(i); Pair kvCqUnit = kvCq.getCqUnitAndStoreTime(i); if (fileCqUnit == null || kvCqUnit == null) { - diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n", + diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n", topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null")); return; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java index 4e83d38f658..d18a24ee1dc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java @@ -84,7 +84,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { if (StringUtils.isNotBlank(topic)) { System.out.print(body.getDiffResult()); } else { - System.out.print(brokerName + " | " + brokerAddr + " | " + body.getDiffResult()); + System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult()); } }