Skip to content

Commit

Permalink
cq双写
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe committed Aug 29, 2024
1 parent a1ea1eb commit 4f32efe
Show file tree
Hide file tree
Showing 23 changed files with 637 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ public boolean initializeMessageStore() {
defaultMessageStore = new RocksDBMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
} else {
defaultMessageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig, topicConfigManager.getTopicConfigTable());
if (messageStoreConfig.isRocksdbCQWriteEnable()) {
defaultMessageStore.enableRocksdbCQWrite();
}
}

if (messageStoreConfig.isEnableDLegerCommitLog()) {
Expand All @@ -815,6 +818,8 @@ public boolean initializeMessageStore() {
} catch (IOException e) {
result = false;
LOG.error("BrokerController#initialize: unexpected error occurs", e);
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -373,6 +374,25 @@ public void setDataVersion(DataVersion dataVersion) {
this.dataVersion = dataVersion;
}

public boolean loadDataVersion() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.dataVersion = obj.dataVersion;
}
LOG.info("load consumer offset dataVersion success, " + fileName + " " + jsonString);
}
return true;
} catch (Exception e) {
LOG.error("load consumer offset dataVersion failed " + fileName, e);
return false;
}
}

public void removeOffset(final String group) {
Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,74 @@

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.RocksDBConfigManager;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.rocksdb.WriteBatch;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {

protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

protected RocksDBConfigManager rocksDBConfigManager;

public RocksDBConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
this.rocksDBConfigManager = new RocksDBConfigManager(configFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs());
}

@Override
public boolean load() {
if (!rocksDBConfigManager.init()) {
return false;
}
return this.rocksDBConfigManager.loadData(this::decodeOffset);
if (!loadDataVersion() || !loadConsumerOffset()) {
return false;
}

return true;
}

public boolean loadConsumerOffset() {
return this.rocksDBConfigManager.loadData(this::decodeOffset) && merge();
}

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferOffsetJsonToRocksdb()) {
log.info("the switch transferOffsetJsonToRocksdb is off, no merge offset operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("consumerOffset json file is not exist, so skip merge");
return true;
}
if (!super.loadDataVersion()) {
log.error("load json consumerOffset dataVersion error, startup will exit");
return false;
}

final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
if (!super.load()) {
log.error("load json consumerOffset info failed, startup will exit");
return false;
}
this.persist();
this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
}
log.info("update offset from json, dataVersion:{}, offsetTable: {} ", this.getDataVersion(), JSON.toJSONString(this.getOffsetTable()));
return true;
}


@Override
public boolean stop() {
return this.rocksDBConfigManager.stop();
Expand All @@ -69,8 +114,7 @@ protected void decodeOffset(final byte[] key, final byte[] body) {
LOG.info("load exist local offset, {}, {}", topicAtGroup, wrapper.getOffsetTable());
}

@Override
public String configFilePath() {
public String rocksdbConfigFilePath() {
return this.brokerController.getMessageStoreConfig().getStorePathRootDir() + File.separator + "config" + File.separator + "consumerOffsets" + File.separator;
}

Expand Down Expand Up @@ -103,4 +147,22 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
writeBatch.put(keyBytes, valueBytes);
}

@Override
public boolean loadDataVersion() {
return this.rocksDBConfigManager.loadDataVersion();
}

@Override
public DataVersion getDataVersion() {
return rocksDBConfigManager.getKvDataVersion();
}

public void updateDataVersion() {
try {
rocksDBConfigManager.updateKvDataVersion();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -32,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -209,6 +212,7 @@
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
Expand Down Expand Up @@ -339,6 +343,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
case RequestCode.DIFF_CONSUME_QUEUE:
return this.diffConsumeQueue(ctx, request);
case RequestCode.UPDATE_AND_GET_GROUP_FORBIDDEN:
return this.updateAndGetGroupForbidden(ctx, request);
case RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG:
Expand Down Expand Up @@ -458,6 +464,79 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
return response;
}

private RemotingCommand diffConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "diff success, very good!")));

DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();

if (!messageStore.getMessageStoreConfig().isRocksdbCQWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "RocksdbCQWriteEnable is false, diffConsumeQueue is invalid")));
return response;
}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
Random random = new Random();
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicToCqListEntry : cqTable.entrySet()) {
String topic = topicToCqListEntry.getKey();
ConcurrentMap<Integer, ConsumeQueueInterface> queueIdToCqMap = topicToCqListEntry.getValue();
for (Map.Entry<Integer, ConsumeQueueInterface> queueIdToCqEntry : queueIdToCqMap.entrySet()) {
Integer queueId = queueIdToCqEntry.getKey();

ConsumeQueueInterface jsonCq = queueIdToCqEntry.getValue();
ConsumeQueueInterface kvCq = rocksDBMessageStore.getConsumeQueue(topic, queueId);

CqUnit kvCqEarliestUnit = kvCq.getEarliestUnit();
CqUnit kvCqLatestUnit = kvCq.getLatestUnit();
CqUnit jsonCqEarliestUnit = jsonCq.getEarliestUnit();
CqUnit jsonCqLatestUnit = jsonCq.getLatestUnit();
LOGGER.info("diffConsumeQueue topic:{}, queue:{}, kvCqEarliestUnit:{}, jsonCqEarliestUnit:{}, kvCqLatestUnit:{}, jsonCqLatestUnit:{}",
topic, queueId, kvCqEarliestUnit, jsonCqEarliestUnit, kvCqLatestUnit, jsonCqLatestUnit);

long jsonOffset = jsonCq.getMaxOffsetInQueue() - 1;
int sampleCount = 10;

Set<Long> sampledOffsets = new HashSet<>();

if (jsonOffset > 100) {
// Randomly sample 10 offsets from the last 100 entries
long startOffset = jsonOffset - 100;
while (sampledOffsets.size() < sampleCount) {
long randomOffset = startOffset + random.nextInt(100);
sampledOffsets.add(randomOffset);
}
} else if (jsonOffset > 10) {
// Take the last 10 entries
long startOffset = jsonOffset - 10;
for (long i = startOffset; i < jsonOffset; i++) {
sampledOffsets.add(i);
}
} else {
// Take all available entries if less than 10
for (long i = 0; i < jsonOffset; i++) {
sampledOffsets.add(i);
}
}

for (long currentOffset : sampledOffsets) {
Pair<CqUnit, Long> kvCqUnitTime = kvCq.getCqUnitAndStoreTime(currentOffset);
Pair<CqUnit, Long> jsonCqUnitTime = jsonCq.getCqUnitAndStoreTime(currentOffset);
if (!checkCqUnitEqual(kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1())) {
String diffInfo = String.format("Difference found at topic:%s, queue:%s, offset:%s - kvCqUnit:%s, jsonCqUnit:%s",
topic, queueId, currentOffset, kvCqUnitTime.getObject1(), jsonCqUnitTime.getObject1());
LOGGER.error(diffInfo);
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", diffInfo)));
return response;
}
}
}

}
return response;
}

@Override
public boolean rejectRequest() {
return false;
Expand Down Expand Up @@ -3305,4 +3384,23 @@ private boolean validateBlackListConfigExist(Properties properties) {
}
return false;
}

private boolean checkCqUnitEqual(CqUnit cqUnit1, CqUnit cqUnit2) {
if (cqUnit1.getQueueOffset() != cqUnit2.getQueueOffset()) {
return false;
}
if (cqUnit1.getSize() != cqUnit2.getSize()) {
return false;
}
if (cqUnit1.getPos() != cqUnit2.getPos()) {
return false;
}
if (cqUnit1.getBatchNum() != cqUnit2.getBatchNum()) {
return false;
}
if (cqUnit1.getTagsCode() != cqUnit2.getTagsCode()) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,30 @@ public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) {

private boolean merge() {
if (!brokerController.getMessageStoreConfig().isTransferMetadataJsonToRocksdb()) {
log.info("The switch is off, no merge operation is needed.");
log.info("the switch transferMetadataJsonToRocksdb is off, no merge subGroup operation is needed.");
return true;
}
if (!UtilAll.isPathExists(this.configFilePath()) && !UtilAll.isPathExists(this.configFilePath() + ".bak")) {
log.info("json file and json back file not exist, so skip merge");
log.info("subGroup json file is not exist, so skip merge");
return true;
}

if (!super.load()) {
log.error("load group and forbidden info from json file error, startup will exit");
if (!super.loadDataVersion()) {
log.error("load json subGroup dataVersion error, startup will exit");
return false;
}

final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = this.getSubscriptionGroupTable();
final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
final DataVersion dataVersion = super.getDataVersion();
final DataVersion kvDataVersion = this.getDataVersion();
if (dataVersion.getCounter().get() > kvDataVersion.getCounter().get()) {
if (!super.load()) {
log.error("load group and forbidden info from json file error, startup will exit");
return false;
}
final ConcurrentMap<String, SubscriptionGroupConfig> groupTable = this.getSubscriptionGroupTable();
for (Map.Entry<String, SubscriptionGroupConfig> entry : groupTable.entrySet()) {
putSubscriptionGroupConfig(entry.getValue());
log.info("import subscription config to rocksdb, group={}", entry.getValue());
}
final ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = this.getForbiddenTable();
for (Map.Entry<String, ConcurrentMap<String, Integer>> entry : forbiddenTable.entrySet()) {
try {
this.rocksDBConfigManager.updateForbidden(entry.getKey(), JSON.toJSONString(entry.getValue()));
Expand All @@ -110,8 +112,10 @@ private boolean merge() {
return false;
}
}
this.rocksDBConfigManager.getKvDataVersion().assignNewOne(dataVersion);
this.getDataVersion().assignNewOne(dataVersion);
updateDataVersion();
} else {
log.info("dataVersion is not greater than kvDataVersion, no need to merge group metaData, dataVersion={}, kvDataVersion={}", dataVersion, kvDataVersion);
}
log.info("finish marge subscription config from json file and merge to rocksdb");
this.persist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ public DataVersion getDataVersion() {
return dataVersion;
}

public boolean loadDataVersion() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (jsonString != null) {
SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
if (obj != null) {
this.dataVersion.assignNewOne(obj.dataVersion);
this.printLoadDataWhenFirstBoot(obj);
}
log.info("load subGroup dataVersion success " + fileName + " " + obj.dataVersion);
}
return true;
} catch (Exception e) {
log.error("load subGroup dataVersion failed" + fileName , e);
return false;
}
}

public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = removeSubscriptionGroupConfig(groupName);
this.forbiddenTable.remove(groupName);
Expand Down
Loading

0 comments on commit 4f32efe

Please sign in to comment.