Skip to content

Commit

Permalink
[ISSUE #8460] add a switch [skipWhenCKRePutReachMaxTimes] whether to …
Browse files Browse the repository at this point in the history
…continue to rewrite CK after max times
  • Loading branch information
imzs committed Aug 5, 2024
1 parent 97895ed commit faff309
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,9 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {

private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
int rePutTimes = oldCK.parseRePutTimes();
if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
if (rePutTimes >= ckRewriteIntervalsInSeconds.length && brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes);
return;
}

Expand All @@ -588,7 +588,8 @@ private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes;
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[intervalIndex] * 1000);
}
MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public void before() {
brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(CLUSTER_NAME);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
Expand Down Expand Up @@ -285,6 +284,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws

@Test
public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws Throwable {
brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes("17");
PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
Expand All @@ -306,6 +306,30 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() th
verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}

@Test
public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_noEnd() throws Throwable {
brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes(Byte.MAX_VALUE + "");
PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
reviveObj.map.put("", ck);
reviveObj.endTime = System.currentTimeMillis();
StringBuilder actualRetryTopic = new StringBuilder();

when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false)));
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> {
MessageExtBrokerInner msg = invocation.getArgument(0);
actualRetryTopic.append(msg.getTopic());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
});

popReviveService.mergeAndRevive(reviveObj);
Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString());
verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry
verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}

@Test
public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable {
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
Expand Down Expand Up @@ -349,6 +373,7 @@ public void testReviveMsgFromCk_messageNotFound_needRetry() throws Throwable {

@Test
public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws Throwable {
brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes("17");
PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
Expand All @@ -363,6 +388,23 @@ public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws Throwable
verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}

@Test
public void testReviveMsgFromCk_messageNotFound_needRetry_noEnd() throws Throwable {
brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes(Byte.MAX_VALUE + "");
PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj();
reviveObj.map.put("", ck);
reviveObj.endTime = System.currentTimeMillis();

when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(Triple.of(null, "", true)));

popReviveService.mergeAndRevive(reviveObj);
verify(escapeBridge, times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry
verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}

public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, long reviveOffset) {
PopCheckPoint ck = new PopCheckPoint();
ck.setStartOffset(startOffset);
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ public class BrokerConfig extends BrokerIdentity {
*/
private String configBlackList = "configBlackList;brokerConfigPath";

// if false, will still rewrite ck after max times 17
private boolean skipWhenCKRePutReachMaxTimes = false;

public String getConfigBlackList() {
return configBlackList;
}
Expand Down Expand Up @@ -1826,4 +1829,12 @@ public boolean isEnablePopMessageThreshold() {
public void setEnablePopMessageThreshold(boolean enablePopMessageThreshold) {
this.enablePopMessageThreshold = enablePopMessageThreshold;
}

public boolean isSkipWhenCKRePutReachMaxTimes() {
return skipWhenCKRePutReachMaxTimes;
}

public void setSkipWhenCKRePutReachMaxTimes(boolean skipWhenCKRePutReachMaxTimes) {
this.skipWhenCKRePutReachMaxTimes = skipWhenCKRePutReachMaxTimes;
}
}

0 comments on commit faff309

Please sign in to comment.