Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #33]perf connecting storm when restart broker #44

Merged
merged 7 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public DeFiBrokerController(BrokerConfig brokerConfig, NettyServerConfig nettySe

ConsumerIdsChangeListener consumerIdsChangeListener = (ConsumerIdsChangeListener) ReflectUtil.getSimpleProperty(BrokerController.class, this, "consumerIdsChangeListener");
AdjustQueueNumStrategy adjustQueueNumStrategy = new AdjustQueueNumStrategy(this);
consumerManager = new DeFiConsumerManager(consumerIdsChangeListener, adjustQueueNumStrategy);
consumerManager = new DeFiConsumerManager(consumerIdsChangeListener, adjustQueueNumStrategy,deFiBusBrokerConfig);

this.deFiManageExecutor =
Executors.newFixedThreadPool(brokerConfig.getClientManageThreadPoolNums(), new ThreadFactoryImpl(
Expand Down Expand Up @@ -209,6 +209,7 @@ public void shutdown() {
deFiManageExecutor.shutdown();
sendReplyMessageExecutor.shutdown();
pushReplyMessageExecutor.shutdown();
consumerManager.getNotifyClientExecutor().shutdown();

deFiScheduledExecutorService.shutdown();
sendReplyScheduledExecutorService.shutdown();
Expand Down Expand Up @@ -275,11 +276,12 @@ public DeFiProducerManager getProducerManager() {

@Override
public void printWaterMark() {
LOG_WATER_MARK.info("{\"SendQueueSize\":\"{}\",\"PullQueueSize\":\"{}\",\"GotQueueSize\":\"{}\",\"PushQueueSize\":\"{}\",\"SendSlowTimeMills\":\"{}\",\"PullSlowTimeMills\":\"{}\",\"HeartbeatQueueSize\":\"{}\"}",
LOG_WATER_MARK.info("{\"SendQueueSize\":\"{}\",\"PullQueueSize\":\"{}\",\"GotQueueSize\":\"{}\",\"PushQueueSize\":\"{}\",\"NotifyClientQueueSize\":\"{}\",\"SendSlowTimeMills\":\"{}\",\"PullSlowTimeMills\":\"{}\",\"HeartbeatQueueSize\":\"{}\"}",
this.getSendThreadPoolQueue().size(),
this.getPullThreadPoolQueue().size(),
this.sendReplyThreadPoolQueue.size(),
this.pushReplyThreadPoolQueue.size(),
consumerManager.getNotifyClientThreadPoolQueue().size(),
this.headSlowTimeMills4SendThreadPoolQueue(),
this.headSlowTimeMills4PullThreadPoolQueue(),
this.getHeartbeatThreadPoolQueue().size());
Expand Down Expand Up @@ -321,4 +323,5 @@ public MessageRedirectManager getMessageRedirectManager() {
public ClientRebalanceResultManager getClientRebalanceResultManager() {
return clientRebalanceResultManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ public static DeFiBrokerController createBrokerController(String[] args) {

messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
// LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
// JoranConfigurator configurator = new JoranConfigurator();
// configurator.setContext(lc);
// lc.reset();
// configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

MixAll.printObjectProperties(log, brokerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.webank.defibus.broker.DeFiBrokerController;
import com.webank.defibus.common.DeFiBusConstant;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
Expand All @@ -44,6 +45,9 @@ public class AdjustQueueNumStrategy {
private final DeFiBrokerController deFiBrokerController;
private final ScheduledThreadPoolExecutor autoScaleQueueSizeExecutorService;

private final String dedupKeyPrefixForQueueScale = "QS$";
private final ConcurrentHashMap<String,String> dedupMapForQueueScale = new ConcurrentHashMap<>();

public AdjustQueueNumStrategy(final DeFiBrokerController deFiBrokerController) {
this.deFiBrokerController = deFiBrokerController;

Expand Down Expand Up @@ -92,11 +96,20 @@ private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) {
}

private void adjustReadQueueNumByConsumerCount(String topic, long delayMills, AdjustType mode) {
String dedupKey = dedupKeyPrefixForQueueScale + "R$" + mode + "$" + topic;
String old = dedupMapForQueueScale.putIfAbsent(dedupKey,topic);
//若old不为空,表示该topic存在即将执行的任务,本次任务可以忽略
//若不存在,表示该topic的任务正在执行,或者没有任务,本次不可忽略
if(StringUtils.isNotBlank(old)) {
return;
}

Runnable scaleQueueTask = new Runnable() {
private int alreadyRetryTimes = 0;

@Override
public void run() {
dedupMapForQueueScale.remove(dedupKey);
TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
if (topicConfig != null) {
synchronized (topicConfig) {
Expand All @@ -118,7 +131,7 @@ public void run() {
log.info("try adjust read queue size to {} for [{}], prev: {}, {}", adjustReadQueueSize, topic, topicConfig.getReadQueueNums(), mode);
if (adjustReadQueueSize < topicConfig.getWriteQueueNums()) {
log.info("adjust read queues to {} for [{}] fail. read queue size can't less than write queue size[{}]. {}",
adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode);
adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode);
return;
}
boolean canAdjustReadQueueSize = isCanAdjustReadQueueSize(topic, adjustReadQueueSize);
Expand Down Expand Up @@ -157,9 +170,18 @@ public void run() {
}

private void adjustWriteQueueNumByConsumerCount(String topic, long delayMills, AdjustType mode) {
String dedupKey = dedupKeyPrefixForQueueScale + "W$" + mode + "$" + topic;
String old = dedupMapForQueueScale.putIfAbsent(dedupKey,topic);
//若old不为空,表示该topic存在即将执行的任务,本次任务可以忽略
//若不存在,表示该topic的任务正在执行,或者没有任务,本次不可忽略
if(StringUtils.isNotBlank(old)) {
return;
}

Runnable scaleTask = new Runnable() {
@Override
public void run() {
dedupMapForQueueScale.remove(dedupKey);
TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
if (topicConfig != null) {
synchronized (topicConfig) {
Expand All @@ -186,7 +208,7 @@ public void run() {
notifyWhenTopicConfigChange(topic);
} else {
log.info("adjust write queues to {} for [{}] fail. target write queue size can't less than 0 or greater than read queue size[{}]. mode: {}",
adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode);
adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode);
}
} else {
log.info("no need to adjust write queue size for [{}]. now [w:{}/r:{}]. {}", topic, topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums(), mode);
Expand Down Expand Up @@ -250,7 +272,7 @@ public boolean test(String cid) {
String idc = cidArr[cidArr.length - 1];
String clusterName = deFiBrokerController.getBrokerConfig().getBrokerClusterName();
if (clusterName.toUpperCase().startsWith(idc) ||
idc.startsWith(clusterName.toUpperCase())) {
idc.startsWith(clusterName.toUpperCase())) {
return true;
}
}
Expand Down Expand Up @@ -332,7 +354,7 @@ private boolean isAllMessageConsumed(String topic, Set<String> groups, int queue
long ackOffset = deFiBrokerController.getConsumeQueueManager().queryOffset(group, topic, queueId);
if (ackOffset < maxOffset) {
log.info("not finish consume message for topic: {} by group : {}, queueId: {}, ackOffset: {}, maxOffset: {}",
topic, group, queueId, ackOffset, maxOffset);
topic, group, queueId, ackOffset, maxOffset);
return false;
}
}
Expand All @@ -343,4 +365,8 @@ public enum AdjustType {
INCREASE_QUEUE_NUM,
DECREASE_QUEUE_NUM
}

public DeFiBrokerController getDeFiBrokerController() {
return deFiBrokerController;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,6 @@ public Set<String> unregisterClientId(final ClientChannelInfo clientChannelInfo)
return whichTopic;
}

@Override
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
try {
final ClientChannelInfo channelInfo = getChannelInfoTable().get(channel);
if (channelInfo != null) {
unregisterClientId(channelInfo);
}
super.doChannelCloseEvent(remoteAddr, channel);
return true;
} catch (Exception ex) {
log.warn("doChannelCloseEvent fail.", ex);
return false;
}
}

public Set<String> findSubscribedTopicByClientId(final String clientId) {
Set<String> result = new HashSet<>();
Expand All @@ -139,4 +125,8 @@ public Set<String> getClientIdBySubscription(String topic) {
}
return new HashSet<>();
}

public ConcurrentHashMap<String, CopyOnWriteArraySet<String>> getClientIdMap() {
return clientIdMap;
}
}
Loading