Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8460] Improve the pop revive process when reading biz messages from a remote broker #8475

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ GenTestRules(
exclude_tests = [
# These tests are extremely slow and flaky, exclude them before they are properly fixed.
"src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest",
"src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest",
],
deps = [
":tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.client.consumer.PullStatus;
Expand All @@ -34,7 +36,6 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
Expand All @@ -47,7 +48,6 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
Expand Down Expand Up @@ -263,34 +263,29 @@ private PutMessageResult transformSendResult2PutResult(SendResult sendResult) {
}
}

public Pair<GetMessageStatus, MessageExt> getMessage(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
public Triple<MessageExt, String, Boolean> getMessage(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
return getMessageAsync(topic, offset, queueId, brokerName, deCompressBody).join();
}

public CompletableFuture<Pair<GetMessageStatus, MessageExt>> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
// Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null
public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
if (messageStore != null) {
return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
.thenApply(result -> {
if (result == null) {
LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId);
return new Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
return Triple.of(null, "getMessageResult is null", false); // local store, so no retry
}
List<MessageExt> list = decodeMsgList(result, deCompressBody);
if (list == null || list.isEmpty()) {
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
return new Pair<>(result.getStatus(), null);
return Triple.of(null, "Can not get msg", false); // local store, so no retry
}
return new Pair<>(result.getStatus(), list.get(0));
return Triple.of(list.get(0), "", false);
});
} else {
return getMessageFromRemoteAsync(topic, offset, queueId, brokerName)
.thenApply(msg -> {
if (msg == null) {
return new Pair<>(GetMessageStatus.MESSAGE_WAS_REMOVING, null);
}
return new Pair<>(GetMessageStatus.FOUND, msg);
});
return getMessageFromRemoteAsync(topic, offset, queueId, brokerName);
}
}

Expand Down Expand Up @@ -322,35 +317,38 @@ protected List<MessageExt> decodeMsgList(GetMessageResult getMessageResult, bool
return foundList;
}

protected MessageExt getMessageFromRemote(String topic, long offset, int queueId, String brokerName) {
protected Triple<MessageExt, String, Boolean> getMessageFromRemote(String topic, long offset, int queueId, String brokerName) {
return getMessageFromRemoteAsync(topic, offset, queueId, brokerName).join();
}

protected CompletableFuture<MessageExt> getMessageFromRemoteAsync(String topic, long offset, int queueId, String brokerName) {
// Triple<MessageExt, info, needRetry>, check info and retry if and only if MessageExt is null
protected CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageFromRemoteAsync(String topic, long offset, int queueId, String brokerName) {
try {
String brokerAddr = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);
if (null == brokerAddr) {
this.brokerController.getTopicRouteInfoManager().updateTopicRouteInfoFromNameServer(topic, true, false);
brokerAddr = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false);

if (null == brokerAddr) {
LOG.warn("can't find broker address for topic {}", topic);
return CompletableFuture.completedFuture(null);
LOG.warn("can't find broker address for topic {}, {}", topic, brokerName);
return CompletableFuture.completedFuture(Triple.of(null, "brokerAddress not found", true)); // maybe offline temporarily, so need retry
}
}

return this.brokerController.getBrokerOuterAPI().pullMessageFromSpecificBrokerAsync(brokerName,
brokerAddr, this.innerConsumerGroupName, topic, queueId, offset, 1, DEFAULT_PULL_TIMEOUT_MILLIS)
.thenApply(pullResult -> {
if (pullResult.getPullStatus().equals(PullStatus.FOUND) && !pullResult.getMsgFoundList().isEmpty()) {
return pullResult.getMsgFoundList().get(0);
if (pullResult.getLeft() != null
&& PullStatus.FOUND.equals(pullResult.getLeft().getPullStatus())
&& CollectionUtils.isNotEmpty(pullResult.getLeft().getMsgFoundList())) {
return Triple.of(pullResult.getLeft().getMsgFoundList().get(0), "", false);
}
return null;
return Triple.of(null, pullResult.getMiddle(), pullResult.getRight());
});
} catch (Exception e) {
LOG.error("Get message from remote failed.", e);
LOG.error("Get message from remote failed. {}, {}, {}, {}", topic, offset, queueId, brokerName, e);
}

return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(Triple.of(null, "Get message from remote failed", true)); // need retry
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.auth.config.AuthConfig;
Expand Down Expand Up @@ -1378,7 +1379,8 @@ public void run0() {
});
}

public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
// Triple<PullResult, info, needRetry>, should check info and retry if and only if PullResult is null
public CompletableFuture<Triple<PullResult, String, Boolean>> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
String consumerGroup, String topic, int queueId, long offset,
int maxNums, long timeoutMillis) throws RemotingException, InterruptedException {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
Expand All @@ -1397,7 +1399,7 @@ public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String b
requestHeader.setBrokerName(brokerName);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
CompletableFuture<PullResult> pullResultFuture = new CompletableFuture<>();
CompletableFuture<Triple<PullResult, String, Boolean>> pullResultFuture = new CompletableFuture<>();
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
Expand All @@ -1409,15 +1411,16 @@ public void operationSucceed(RemotingCommand response) {
try {
PullResultExt pullResultExt = processPullResponse(response, brokerAddr);
processPullResult(pullResultExt, brokerName, queueId);
pullResultFuture.complete(pullResultExt);
pullResultFuture.complete(Triple.of(pullResultExt, pullResultExt.getPullStatus().name(), false)); // found or not found really, so no retry
} catch (Exception e) {
pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
// retry when NO_PERMISSION, SUBSCRIPTION_GROUP_NOT_EXIST etc. even when TOPIC_NOT_EXIST
pullResultFuture.complete(Triple.of(null, "Response Code:" + response.getCode(), true));
}
}

@Override
public void operationFail(Throwable throwable) {
pullResultFuture.complete(new PullResult(PullStatus.NO_MATCHED_MSG, -1, -1, -1, new ArrayList<>()));
pullResultFuture.complete(Triple.of(null, throwable.getMessage(), true));
}
});
return pullResultFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
Expand All @@ -51,7 +53,6 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
Expand All @@ -63,6 +64,7 @@

public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };

private int queueId;
private BrokerController brokerController;
Expand Down Expand Up @@ -196,7 +198,8 @@ private boolean reachTail(PullResult pullResult, long offset) {
|| pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && offset == pullResult.getMaxOffset();
}

private CompletableFuture<Pair<GetMessageStatus, MessageExt>> getBizMessage(String topic, long offset, int queueId,
// Triple<MessageExt, info, needRetry>
private CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(String topic, long offset, int queueId,
String brokerName) {
return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false);
}
Expand Down Expand Up @@ -491,6 +494,8 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
PopCheckPoint oldCK = inflightReviveRequestMap.firstKey();
rePutCK(oldCK, pair);
inflightReviveRequestMap.remove(oldCK);
POP_LOGGER.warn("stay too long, remove from reviveRequestMap, {}, {}, {}, {}", popCheckPoint.getTopic(),
popCheckPoint.getBrokerName(), popCheckPoint.getQueueId(), popCheckPoint.getStartOffset());
}
}

Expand Down Expand Up @@ -524,22 +529,12 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
.thenApply(resultPair -> {
GetMessageStatus getMessageStatus = resultPair.getObject1();
MessageExt message = resultPair.getObject2();
.thenApply(rst -> {
MessageExt message = rst.getLeft();
if (message == null) {
POP_LOGGER.debug("reviveQueueId={}, can not get biz msg topic is {}, offset is {}, then continue",
queueId, popCheckPoint.getTopic(), msgOffset);
switch (getMessageStatus) {
case MESSAGE_WAS_REMOVING:
case OFFSET_TOO_SMALL:
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
return new Pair<>(msgOffset, true);
default:
return new Pair<>(msgOffset, false);

}
POP_LOGGER.info("reviveQueueId={}, can not get biz msg, topic:{}, qid:{}, offset:{}, brokerName:{}, info:{}, retry:{}, then continue",
queueId, popCheckPoint.getTopic(), popCheckPoint.getQueueId(), msgOffset, popCheckPoint.getBrokerName(), UtilAll.frontStringAtLeast(rst.getMiddle(), 60), rst.getRight());
return new Pair<>(msgOffset, !rst.getRight()); // Pair.object2 means OK or not, Triple.right value means needRetry
}
boolean result = reviveRetry(popCheckPoint, message);
return new Pair<>(msgOffset, result);
Expand Down Expand Up @@ -572,6 +567,13 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
}

private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
int rePutTimes = oldCK.parseRePutTimes();
if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
return;
}

PopCheckPoint newCk = new PopCheckPoint();
newCk.setBitMap(0);
newCk.setNum((byte) 1);
Expand All @@ -583,6 +585,11 @@ private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
newCk.setQueueId(oldCK.getQueueId());
newCk.setBrokerName(oldCK.getBrokerName());
newCk.addDiff(0);
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
}
MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
}
Expand Down
Loading
Loading