Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8698] Remove batch write in kv cq store and update rocksdb cq check tool #8739

Merged
merged 13 commits into from
Sep 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
Expand Down Expand Up @@ -470,16 +471,21 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
String requestTopic = requestHeader.getTopic();
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);

DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
MessageStore messageStore = brokerController.getMessageStore();
DefaultMessageStore defaultMessageStore;
if (messageStore instanceof AbstractPluginMessageStore) {
defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
} else {
defaultMessageStore = (DefaultMessageStore) messageStore;
}
RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
return response;
}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
StringBuilder diffResult = new StringBuilder();
try {
if (StringUtils.isNotBlank(requestTopic)) {
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
Expand Down Expand Up @@ -516,15 +522,15 @@ private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInt
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
if (fileCqUnit == null || kvCqUnit == null) {
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
return;
}
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s \n kv : %s \n",
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
LOGGER.error(diffInfo);
diffResult.append(diffInfo).append("\n");
diffResult.append(diffInfo).append(System.lineSeparator());
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,15 @@ public class MessageStoreConfig {

private boolean rocksdbCQDoubleWriteEnable = false;

private boolean enableBatchWriteKvCq = true;
private int batchWriteKvCqSize = 16;


public boolean isEnableBatchWriteKvCq() {
return enableBatchWriteKvCq;
public int getBatchWriteKvCqSize() {
return batchWriteKvCqSize;
}

public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
this.enableBatchWriteKvCq = enableBatchWriteKvCq;
public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
this.batchWriteKvCqSize = batchWriteKvCqSize;
}

public boolean isRocksdbCQDoubleWriteEnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,4 +661,8 @@ public void recoverTopicQueueTable() {
public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
next.notifyMessageArriveIfNecessary(dispatchRequest);
}

public MessageStore getNext() {
return next;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
public static final byte CTRL_1 = '\u0001';
public static final byte CTRL_2 = '\u0002';

private static final int BATCH_SIZE = 16;
private final int batchSize;
public static final int MAX_KEY_LEN = 300;

private final ScheduledExecutorService scheduledExecutorService;
Expand All @@ -77,8 +77,6 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
private volatile boolean isCQError = false;

private boolean enableBatchWriteKvCq;

public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);

Expand All @@ -88,11 +86,11 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);

this.writeBatch = new WriteBatch();
this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq();
this.bufferDRList = new ArrayList(BATCH_SIZE);
this.cqBBPairList = new ArrayList(BATCH_SIZE);
this.offsetBBPairList = new ArrayList(BATCH_SIZE);
for (int i = 0; i < BATCH_SIZE; i++) {
this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
this.bufferDRList = new ArrayList(batchSize);
this.cqBBPairList = new ArrayList(batchSize);
this.offsetBBPairList = new ArrayList(batchSize);
for (int i = 0; i < batchSize; i++) {
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
}
Expand Down Expand Up @@ -166,12 +164,13 @@ private boolean shutdownInner() {

@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
if (request == null || this.bufferDRList.size() >= batchSize) {
putMessagePosition();
}

if (request != null) {
this.bufferDRList.add(request);
}
if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) {
putMessagePosition();
}
}

public void putMessagePosition() throws RocksDBException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
Expand Down Expand Up @@ -304,6 +305,7 @@ public static void initCommand() {
initCommand(new ListAclSubCommand());
initCommand(new CopyAclsSubCommand());
initCommand(new RocksDBConfigToJsonCommand());
initCommand(new CheckRocksdbCqWriteProgressCommand());
}

private static void printHelp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand {

@Override
public String commandName() {
return "checkRocksdbCqWriteProgressCommandCommand";
return "checkRocksdbCqWriteProgress";
}

@Override
Expand Down Expand Up @@ -82,9 +82,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
if (StringUtils.isNotBlank(topic)) {
System.out.printf(body.getDiffResult());
System.out.print(body.getDiffResult());
} else {
System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult());
System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
}
}

Expand Down
Loading