Skip to content

Commit

Permalink
[ISSUE #8698] Remove batch write in kv cq store and update rocksdb cq…
Browse files Browse the repository at this point in the history
… check tool (#8739)
  • Loading branch information
LetLetMe authored Sep 25, 2024
1 parent 59bafe8 commit 3b5cbf8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
Expand Down Expand Up @@ -470,16 +471,21 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
String requestTopic = requestHeader.getTopic();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);

DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
MessageStore messageStore = brokerController.getMessageStore();
DefaultMessageStore defaultMessageStore;
if (messageStore instanceof AbstractPluginMessageStore) {
defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
} else {
defaultMessageStore = (DefaultMessageStore) messageStore;
}
RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
return response;
}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder();
try {
if (StringUtils.isNotBlank(requestTopic)) {
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
Expand Down Expand Up @@ -516,15 +522,15 @@ private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInt
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
return;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s \n kv : %s \n",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
diffResult.append(diffInfo).append("\n");
diffResult.append(diffInfo).append(System.lineSeparator());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,15 @@ public class MessageStoreConfig {

private boolean rocksdbCQDoubleWriteEnable = false;

private boolean enableBatchWriteKvCq = true;
private int batchWriteKvCqSize = 16;


public boolean isEnableBatchWriteKvCq() {
return enableBatchWriteKvCq;
public int getBatchWriteKvCqSize() {
return batchWriteKvCqSize;
}

public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
this.enableBatchWriteKvCq = enableBatchWriteKvCq;
public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
this.batchWriteKvCqSize = batchWriteKvCqSize;
}

public boolean isRocksdbCQDoubleWriteEnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,4 +661,8 @@ public void recoverTopicQueueTable() {
public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
next.notifyMessageArriveIfNecessary(dispatchRequest);
}

public MessageStore getNext() {
return next;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
public static final byte CTRL_1 = '\u0001';
public static final byte CTRL_2 = '\u0002';

private static final int BATCH_SIZE = 16;
private final int batchSize;
public static final int MAX_KEY_LEN = 300;

private final ScheduledExecutorService scheduledExecutorService;
Expand All @@ -77,8 +77,6 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
private volatile boolean isCQError = false;

private boolean enableBatchWriteKvCq;

public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);

Expand All @@ -88,11 +86,11 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);

this.writeBatch = new WriteBatch();
this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq();
this.bufferDRList = new ArrayList(BATCH_SIZE);
this.cqBBPairList = new ArrayList(BATCH_SIZE);
this.offsetBBPairList = new ArrayList(BATCH_SIZE);
for (int i = 0; i < BATCH_SIZE; i++) {
this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
this.bufferDRList = new ArrayList(batchSize);
this.cqBBPairList = new ArrayList(batchSize);
this.offsetBBPairList = new ArrayList(batchSize);
for (int i = 0; i < batchSize; i++) {
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
}
Expand Down Expand Up @@ -166,12 +164,13 @@ private boolean shutdownInner() {

@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
if (request == null || this.bufferDRList.size() >= batchSize) {
putMessagePosition();
}

if (request != null) {
this.bufferDRList.add(request);
}
if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) {
putMessagePosition();
}
}

public void putMessagePosition() throws RocksDBException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
Expand Down Expand Up @@ -304,6 +305,7 @@ public static void initCommand() {
initCommand(new ListAclSubCommand());
initCommand(new CopyAclsSubCommand());
initCommand(new RocksDBConfigToJsonCommand());
initCommand(new CheckRocksdbCqWriteProgressCommand());
}

private static void printHelp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand {

@Override
public String commandName() {
return "checkRocksdbCqWriteProgressCommandCommand";
return "checkRocksdbCqWriteProgress";
}

@Override
Expand Down Expand Up @@ -82,9 +82,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
if (StringUtils.isNotBlank(topic)) {
System.out.printf(body.getDiffResult());
System.out.print(body.getDiffResult());
} else {
System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult());
System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
}
}

Expand Down

0 comments on commit 3b5cbf8

Please sign in to comment.