Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4099]Optimized the performance of sending traceMessage in AsyncTraceDispatcher #4180

Merged
merged 4 commits into from
Apr 27, 2022
Merged

Conversation

dugenkui03
Copy link
Contributor

@dugenkui03 dugenkui03 commented Apr 17, 2022

What is the purpose of the change

Details in #4099 (comment). Given the details for convenience.

Describe in Chinese

该 PR 目的是优化轨迹消息发送模块性能,详情讨论见 #4099 (comment) 。为 review 方便此处给出设计详情和背景。

当前逻辑

image

  1. 业务线程调用AsyncTraceDispatcher#append向任务元数据队列traceContextQueue添加TraceContext
  2. worker线程循环从traceContextQueue中获取TraceContext、并包装为任务AsyncAppenderRequest,由线程池traceExecutor执行;
  3. AsyncAppenderRequest#sendTraceData中将TraceContext转换为可计量大小的TraceTransferBean,并按照按照topic+regionIdTraceTransferBean进行分组、获取结构为Map<String, List<TraceTransferBean>>的数据;
  4. TraceTransferBean按照maxMsgSize进行分批次的发送,根据AsyncTraceDispatcher#accessChannel决定发送到rmq_sys_TRACE_DATA_${regionId}还是轨迹消息队列(默认为RMQ_SYS_TRACE_TOPIC) 。

当前逻辑的问题是worker通过AsyncAppenderRequest对数据进行了不必要的分割List<TraceTransferBean>,在AsyncAppenderRequest#sendTraceData中对数据也进行了不必要的分组。

优化思路

image

  1. work线程将数据按照topic分组、保存到HashMap<String, TraceDataSegment>中,TraceDataSegment中保存了TraceTransferBean列表、数据大小和当前列表第一个TraceTransferBean的保存时间;
  2. 将数据保存到TraceDataSegment中时会判断TraceTransferBean列表是否已经达到 maxMsgSize,是则发送数据、并恢复TraceDataSegment的初始值;
  3. work线程每pollingTimeMil毫秒会遍历所有的TraceDataSegment,检查其数据保存超过是否waitTimeThresholdMil毫秒,是则发送所有数据、以此避免时间保存过久不发送。

该算法的总体思路是在数据达到大小阈值maxMsgSize或者时间阈值时发送数据。可避免之前因为数据不恰当的分组导致过多的请求。

@codecov-commenter
Copy link

codecov-commenter commented Apr 17, 2022

Codecov Report

Merging #4180 (8c72734) into develop (fd554ab) will increase coverage by 0.07%.
The diff coverage is 83.87%.

@@              Coverage Diff              @@
##             develop    #4180      +/-   ##
=============================================
+ Coverage      47.92%   48.00%   +0.07%     
- Complexity      5002     5016      +14     
=============================================
  Files            634      635       +1     
  Lines          42529    42508      -21     
  Branches        5573     5563      -10     
=============================================
+ Hits           20381    20404      +23     
+ Misses         19647    19616      -31     
+ Partials        2501     2488      -13     
Impacted Files Coverage Δ
...he/rocketmq/client/trace/AsyncTraceDispatcher.java 82.43% <83.87%> (+2.73%) ⬆️
...ketmq/common/protocol/body/ConsumerConnection.java 95.83% <0.00%> (-4.17%) ⬇️
...ava/org/apache/rocketmq/test/util/VerifyUtils.java 46.26% <0.00%> (-2.99%) ⬇️
...lient/impl/consumer/DefaultMQPushConsumerImpl.java 41.05% <0.00%> (-2.99%) ⬇️
...ava/org/apache/rocketmq/filter/util/BitsArray.java 59.82% <0.00%> (-2.57%) ⬇️
...mq/client/impl/consumer/RebalanceLitePullImpl.java 72.05% <0.00%> (-1.48%) ⬇️
...e/rocketmq/client/impl/consumer/RebalanceImpl.java 43.75% <0.00%> (-1.18%) ⬇️
...mq/client/impl/producer/DefaultMQProducerImpl.java 44.72% <0.00%> (-0.87%) ⬇️
...nt/impl/consumer/ConsumeMessageOrderlyService.java 49.64% <0.00%> (-0.71%) ⬇️
...ain/java/org/apache/rocketmq/store/MappedFile.java 51.05% <0.00%> (-0.71%) ⬇️
... and 37 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update fd554ab...8c72734. Read the comment docs.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.04%) to 52.017% when pulling 64488bd on dugenkui03:patch-007 into 50e314e on apache:develop.

@coveralls
Copy link

coveralls commented Apr 17, 2022

Coverage Status

Coverage decreased (-0.1%) to 51.858% when pulling 8c72734 on dugenkui03:patch-007 into 50e314e on apache:develop.

@dugenkui03 dugenkui03 changed the title [ISSUE #4099]Optimized the performance of sending traceMessage [ISSUE #4099]Optimized the performance of sending traceMessage in **AsyncTraceDispatcher** Apr 20, 2022
@dugenkui03 dugenkui03 changed the title [ISSUE #4099]Optimized the performance of sending traceMessage in **AsyncTraceDispatcher** [ISSUE #4099]Optimized the performance of sending traceMessage in AsyncTraceDispatcher Apr 20, 2022
}

@Override
public void run() {
StringBuilder buffer = new StringBuilder(1024);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good habit to catch the unchecked exception in runnable, if we do not get the future result, for the ThreadPool will swallow the exception, and we get no information if something unexpected happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a try catch in the following invoked method sendTraceDataByMQ(), is this enough?

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is enough since the code outside the try-catch is simple, but it may introduce vulnerabilities if someone else adds code to it in the future.
It is better to show the best practice if you have spare time.

@@ -358,7 +382,7 @@ private void flushData(List<TraceTransferBean> transBeanList, String dataTopic,
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
private void sendTraceDataByMQ(Set<String> keySet, final String data, String regionId) {
String traceTopic = traceTopicName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the topic is generated by getTraceTopicName during the previous process.
Here is better to just use the topic generated before, and do not generate it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a Lot. I do want to use the topicName generated in previous process, but forget.

@dugenkui03 dugenkui03 requested a review from dongeforever April 25, 2022 02:41
@dugenkui03
Copy link
Contributor Author

@dongeforever Thanks for your review, this pr is updated, please help to review.

}

@Override
public void run() {
StringBuilder buffer = new StringBuilder(1024);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it is enough since the code outside the try-catch is simple, but it may introduce vulnerabilities if someone else adds code to it in the future.
It is better to show the best practice if you have spare time.

@duhenglucky duhenglucky merged commit b6e65a8 into apache:develop Apr 27, 2022
@duhenglucky duhenglucky added this to the 4.9.4 milestone May 19, 2022
GenerousMan pushed a commit to GenerousMan/rocketmq that referenced this pull request Aug 12, 2022
…in `AsyncTraceDispatcher` (apache#4180)

* Optimized the performance of sending traceMessage

* Optimized the performance of sending traceMessage

* Optimized the performance of sending traceMessage

* Optimized the performance of sending traceMessage
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants