Skip to content

Commit

Permalink
[ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog
Browse files Browse the repository at this point in the history
  • Loading branch information
redlsz authored Sep 18, 2024
1 parent a28c2cb commit 280804c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
8 changes: 5 additions & 3 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -1903,7 +1904,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner);
if (isMultiDispatchMsg) {
AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
if (appendMessageResult != null) {
Expand Down Expand Up @@ -2244,8 +2245,9 @@ public FlushManager getFlushManager() {
return flushManager;
}

public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
return StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
public static boolean isMultiDispatchMsg(MessageStoreConfig messageStoreConfig, MessageExtBrokerInner msg) {
return StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)) &&
MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig, msg.getTopic());
}

private boolean isCloseReadAhead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner msgInner)
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();

if (messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner)) {
if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) {
return encodeWithoutProperties(msgInner);
}

Expand Down

0 comments on commit 280804c

Please sign in to comment.