From 280804c5592341f92a43b6d72ec6e94db77b74ac Mon Sep 17 00:00:00 2001 From: Liu Shengzhong Date: Wed, 18 Sep 2024 13:57:20 +0800 Subject: [PATCH] [ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog --- .../main/java/org/apache/rocketmq/store/CommitLog.java | 8 +++++--- .../java/org/apache/rocketmq/store/MessageExtEncoder.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index f34c6944c99..972e71aadd8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -61,6 +61,7 @@ import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.store.queue.MultiDispatchUtils; import org.apache.rocketmq.store.util.LibC; import org.rocksdb.RocksDBException; @@ -1903,7 +1904,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); - boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner); + final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner); if (isMultiDispatchMsg) { AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner); if (appendMessageResult != null) { @@ -2244,8 +2245,9 @@ public FlushManager getFlushManager() { return flushManager; } - public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) { - return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX); + public static boolean isMultiDispatchMsg(MessageStoreConfig messageStoreConfig, MessageExtBrokerInner msg) { + return StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && + MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, msg.getTopic()); } private boolean isCloseReadAhead() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java index 20e9a652b7e..5c74918d9e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java @@ -175,7 +175,7 @@ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner) public PutMessageResult encode(MessageExtBrokerInner msgInner) { this.byteBuf.clear(); - if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) { + if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) { return encodeWithoutProperties(msgInner); }