Skip to content

Commit

Permalink
[ISSUE #8429] Fix trace message loss when traffic is heavy (#8430)
Browse files Browse the repository at this point in the history
  • Loading branch information
LetLetMe authored Jul 29, 2024
1 parent 263f0fb commit 217fc8d
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 234 deletions.
11 changes: 11 additions & 0 deletions client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class ClientConfig {
*/
private int persistConsumerOffsetInterval = 1000 * 5;
private long pullTimeDelayMillsWhenException = 1000;

private int traceMsgBatchNum = 10;
private boolean unitMode = false;
private String unitName;
private boolean decodeReadBody = Boolean.parseBoolean(System.getProperty(DECODE_READ_BODY, "true"));
Expand Down Expand Up @@ -127,6 +130,14 @@ public String buildMQClientId() {
return sb.toString();
}

public int getTraceMsgBatchNum() {
return traceMsgBatchNum;
}

public void setTraceMsgBatchNum(int traceMsgBatchNum) {
this.traceMsgBatchNum = traceMsgBatchNum;
}

public String getClientIP() {
return clientIP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public DefaultLitePullConsumer(RPCHook rpcHook) {
* Constructor specifying consumer group, RPC hook
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
Expand All @@ -213,7 +213,7 @@ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
* Constructor specifying namespace, consumer group and RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
* @param rpcHook RPC hook to execute before each remoting command.
*/
@Deprecated
public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
Expand Down Expand Up @@ -270,6 +270,7 @@ public void subscribe(String topic, MessageSelector messageSelector) throws MQCl
public void unsubscribe(String topic) {
this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
}

@Override
public void assign(Collection<MessageQueue> messageQueues) {
defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
Expand Down Expand Up @@ -338,7 +339,8 @@ public void commit() {
this.defaultLitePullConsumerImpl.commitAll();
}

@Override public void commit(Map<MessageQueue, Long> offsetMap, boolean persist) {
@Override
public void commit(Map<MessageQueue, Long> offsetMap, boolean persist) {
this.defaultLitePullConsumerImpl.commit(offsetMap, persist);
}

Expand All @@ -361,11 +363,11 @@ public Set<MessageQueue> assignment() throws MQClientException {
* @param messageQueueListener
*/
@Override
public void subscribe(String topic, String subExpression, MessageQueueListener messageQueueListener) throws MQClientException {
public void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression, messageQueueListener);
}


@Override
public void commit(final Set<MessageQueue> messageQueues, boolean persist) {
this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
Expand Down Expand Up @@ -589,7 +591,7 @@ public TraceDispatcher getTraceDispatcher() {
private void setTraceDispatcher() {
if (enableTrace) {
try {
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, getTraceMsgBatchNum(), traceTopic, rpcHook);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
traceDispatcher.setNamespaceV2(namespaceV2);
this.traceDispatcher = traceDispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@
/**
* In most scenarios, this is the mostly recommended class to consume messages.
* </p>
*
* Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
* arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
* </p>
*
* See quickstart/Consumer in the example module for a typical usage.
* </p>
*
Expand All @@ -76,29 +74,25 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
* </p>
*
* See <a href="https://rocketmq.apache.org/docs/introduction/02concepts">here</a> for further discussion.
*/
private String consumerGroup;

/**
* Message model defines the way how messages are delivered to each consumer clients.
* </p>
*
* RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
* the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
* balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
* separately.
* </p>
*
* This field defaults to clustering.
*/
private MessageModel messageModel = MessageModel.CLUSTERING;

/**
* Consuming point on consumer booting.
* </p>
*
* There are three consuming points:
* <ul>
* <li>
Expand Down Expand Up @@ -239,7 +233,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private int pullBatchSize = 32;


private int pullBatchSizeInBytes = 256 * 1024;

/**
Expand All @@ -256,7 +249,6 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Max re-consume times.
* In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
*
* If messages are re-consumed more than {@link #maxReconsumeTimes} before success.
*/
private int maxReconsumeTimes = -1;
Expand Down Expand Up @@ -312,7 +304,6 @@ public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}


/**
* Constructor specifying RPC hook.
*
Expand All @@ -326,29 +317,29 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
* Constructor specifying consumer group, RPC hook.
*
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
}


/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}

/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
Expand All @@ -359,14 +350,15 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
Expand All @@ -378,7 +370,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
/**
* Constructor specifying namespace and consumer group.
*
* @param namespace Namespace for this MQ Producer instance.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
*/
@Deprecated
Expand All @@ -389,9 +381,9 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup)
/**
* Constructor specifying namespace, consumer group and RPC hook .
*
* @param namespace Namespace for this MQ Producer instance.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consumer group.
* @param rpcHook RPC hook to execute before each remoting command.
* @param rpcHook RPC hook to execute before each remoting command.
*/
@Deprecated
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
Expand All @@ -401,9 +393,9 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
/**
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
@Deprecated
Expand All @@ -419,16 +411,17 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
/**
* Constructor specifying namespace, consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
@Deprecated
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.rpcHook = rpcHook;
Expand All @@ -443,7 +436,8 @@ public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
public void createTopic(String key, String newTopic, int queueNum,
Map<String, String> attributes) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0, null);
}

Expand All @@ -457,7 +451,8 @@ public void setUseTLS(boolean useTLS) {
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag,
Map<String, String> attributes) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}

Expand Down Expand Up @@ -677,16 +672,16 @@ public void setSubscription(Map<String, String> subscription) {

/**
* Send message back to broker which will be re-delivered in future.
*
* <p>
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*
* @param msg Message to send back.
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
Expand All @@ -699,17 +694,17 @@ public void sendMessageBack(MessageExt msg, int delayLevel)
/**
* Send message back to the broker whose name is <code>brokerName</code> and the message will be re-delivered in
* future.
*
* <p>
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*
* @param msg Message to send back.
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any broker error.
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
Expand All @@ -735,7 +730,7 @@ public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, getTraceMsgBatchNum(), traceTopic, rpcHook);
dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
dispatcher.setNamespaceV2(namespaceV2);
traceDispatcher = dispatcher;
Expand Down Expand Up @@ -799,9 +794,9 @@ public void registerMessageListener(MessageListenerOrderly messageListener) {
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
Expand All @@ -812,8 +807,8 @@ public void subscribe(String topic, String subExpression) throws MQClientExcepti
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to consume.
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param topic topic to consume.
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
*/
@Override
Expand All @@ -824,7 +819,7 @@ public void subscribe(String topic, String fullClassName, String filterClassSour
/**
* Subscribe a topic by message selector.
*
* @param topic topic to consume.
* @param topic topic to consume.
* @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
* @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
* @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void start() throws MQClientException {
}
if (enableTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic, rpcHook);
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, getTraceMsgBatchNum(), traceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
dispatcher.setNamespaceV2(this.namespaceV2);
traceDispatcher = dispatcher;
Expand Down
Loading

0 comments on commit 217fc8d

Please sign in to comment.