Skip to content

Commit

Permalink
Reduce unnecessary switches
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Oct 24, 2024
1 parent 77b7e55 commit c3fd604
Showing 1 changed file with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.KeyBuilder;
Expand Down Expand Up @@ -227,6 +228,7 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
Expand Down Expand Up @@ -3404,20 +3406,24 @@ private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerCo
StringBuilder diffResult = new StringBuilder();
try {
if (StringUtils.isNotBlank(requestTopic)) {
boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
boolean checkResult = processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
result.setCheckResult(diffResult.toString());
result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
return result;
}
boolean checkResult = true;
int successNum = 0;
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : cqTable.entrySet()) {
String topic = topicEntry.getKey();
checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult, true, requestHeader.getCheckStoreTime());
if (!checkResult) {
break;
checkResult = processConsumeQueuesForTopic(topicEntry.getValue(), topic, rocksDBMessageStore, diffResult, false, requestHeader.getCheckStoreTime());
if (checkResult) {
successNum++;
}
}
diffResult.append("check all topic successful, size:").append(cqTable.size());
// check all topic finish, all topic is ready -> ready
// check all topic finish, success/all : 89/100 -> not ready
String checkResultString = successNum == cqTable.size() ? "all topic is ready" : String.format("success/all : %s/%s", successNum, cqTable.size());
diffResult.append("check all topic finish, ").append(checkResultString);
result.setCheckResult(diffResult.toString());
result.setCheckStatus(checkResult ? CheckRocksdbCqWriteResult.CheckStatus.CHECK_OK.getValue() : CheckRocksdbCqWriteResult.CheckStatus.CHECK_NOT_OK.getValue());
} catch (Exception e) {
Expand All @@ -3428,18 +3434,19 @@ private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerCo
return result;
}

private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean checkAll, long checkStoreTime) {
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkStoreTime)
throws RocksDBException {
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
ConsumeQueueInterface jsonCq = queueEntry.getValue();
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);
if (!checkAll) {
if (printDetail) {
String format = String.format("[topic: %s, queue: %s] \n kvEarliest : %s | kvLatest : %s \n fileEarliest: %s | fileEarliest: %s ",
topic, queueId, kvCq.getEarliestUnit(), kvCq.getLatestUnit(), jsonCq.getEarliestUnit(), jsonCq.getLatestUnit());
diffResult.append(format).append("\n");
}
long maxFileOffsetInQueue = jsonCq.getMaxOffsetInQueue();
long minOffsetInQueue = kvCq.getMinOffsetInQueue();
long minOffsetInQueue = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkStoreTime, BoundaryType.UPPER);

// The latest message is earlier than the check time
Pair<CqUnit, Long> fileLatestCq = jsonCq.getCqUnitAndStoreTime(maxFileOffsetInQueue);
Expand All @@ -3453,18 +3460,19 @@ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueue
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
String format = String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null");
LOGGER.error(format);
break;
}
// It's protective. It's not actually enforceable
if (kvCqUnit.getObject2() < checkStoreTime) {
continue;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
String diffInfo = String.format("[topic: %s, queue: %s, offset: %s] \n file : %s \n kv : %s \n",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
diffResult.append(diffInfo).append(System.lineSeparator());
return false;
}
}
Expand Down

0 comments on commit c3fd604

Please sign in to comment.