diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBrokerController.java b/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBrokerController.java index 4a4db14..e0d1e7c 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBrokerController.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBrokerController.java @@ -96,7 +96,7 @@ public DeFiBrokerController(BrokerConfig brokerConfig, NettyServerConfig nettySe ConsumerIdsChangeListener consumerIdsChangeListener = (ConsumerIdsChangeListener) ReflectUtil.getSimpleProperty(BrokerController.class, this, "consumerIdsChangeListener"); AdjustQueueNumStrategy adjustQueueNumStrategy = new AdjustQueueNumStrategy(this); - consumerManager = new DeFiConsumerManager(consumerIdsChangeListener, adjustQueueNumStrategy); + consumerManager = new DeFiConsumerManager(consumerIdsChangeListener, adjustQueueNumStrategy,deFiBusBrokerConfig); this.deFiManageExecutor = Executors.newFixedThreadPool(brokerConfig.getClientManageThreadPoolNums(), new ThreadFactoryImpl( @@ -209,6 +209,7 @@ public void shutdown() { deFiManageExecutor.shutdown(); sendReplyMessageExecutor.shutdown(); pushReplyMessageExecutor.shutdown(); + consumerManager.getNotifyClientExecutor().shutdown(); deFiScheduledExecutorService.shutdown(); sendReplyScheduledExecutorService.shutdown(); @@ -275,11 +276,12 @@ public DeFiProducerManager getProducerManager() { @Override public void printWaterMark() { - LOG_WATER_MARK.info("{\"SendQueueSize\":\"{}\",\"PullQueueSize\":\"{}\",\"GotQueueSize\":\"{}\",\"PushQueueSize\":\"{}\",\"SendSlowTimeMills\":\"{}\",\"PullSlowTimeMills\":\"{}\",\"HeartbeatQueueSize\":\"{}\"}", + LOG_WATER_MARK.info("{\"SendQueueSize\":\"{}\",\"PullQueueSize\":\"{}\",\"GotQueueSize\":\"{}\",\"PushQueueSize\":\"{}\",\"NotifyClientQueueSize\":\"{}\",\"SendSlowTimeMills\":\"{}\",\"PullSlowTimeMills\":\"{}\",\"HeartbeatQueueSize\":\"{}\"}", this.getSendThreadPoolQueue().size(), this.getPullThreadPoolQueue().size(), this.sendReplyThreadPoolQueue.size(), this.pushReplyThreadPoolQueue.size(), + consumerManager.getNotifyClientThreadPoolQueue().size(), this.headSlowTimeMills4SendThreadPoolQueue(), this.headSlowTimeMills4PullThreadPoolQueue(), this.getHeartbeatThreadPoolQueue().size()); @@ -321,4 +323,5 @@ public MessageRedirectManager getMessageRedirectManager() { public ClientRebalanceResultManager getClientRebalanceResultManager() { return clientRebalanceResultManager; } + } diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBusBrokerStartup.java b/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBusBrokerStartup.java index c99af44..ca7e2bc 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBusBrokerStartup.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/DeFiBusBrokerStartup.java @@ -168,11 +168,11 @@ public static DeFiBrokerController createBrokerController(String[] args) { messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); - LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); - JoranConfigurator configurator = new JoranConfigurator(); - configurator.setContext(lc); - lc.reset(); - configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); +// LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); +// JoranConfigurator configurator = new JoranConfigurator(); +// configurator.setContext(lc); +// lc.reset(); +// configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); MixAll.printObjectProperties(log, brokerConfig); diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java b/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java index 7a666dd..b6bf8c9 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/client/AdjustQueueNumStrategy.java @@ -20,6 +20,7 @@ import com.webank.defibus.broker.DeFiBrokerController; import com.webank.defibus.common.DeFiBusConstant; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; @@ -44,6 +45,9 @@ public class AdjustQueueNumStrategy { private final DeFiBrokerController deFiBrokerController; private final ScheduledThreadPoolExecutor autoScaleQueueSizeExecutorService; + private final String dedupKeyPrefixForQueueScale = "QS$"; + private final ConcurrentHashMap dedupMapForQueueScale = new ConcurrentHashMap<>(); + public AdjustQueueNumStrategy(final DeFiBrokerController deFiBrokerController) { this.deFiBrokerController = deFiBrokerController; @@ -92,11 +96,20 @@ private void adjustQueueNumByConsumerCount(String topic, AdjustType scaleType) { } private void adjustReadQueueNumByConsumerCount(String topic, long delayMills, AdjustType mode) { + String dedupKey = dedupKeyPrefixForQueueScale + "R$" + mode + "$" + topic; + String old = dedupMapForQueueScale.putIfAbsent(dedupKey,topic); + //若old不为空,表示该topic存在即将执行的任务,本次任务可以忽略 + //若不存在,表示该topic的任务正在执行,或者没有任务,本次不可忽略 + if(StringUtils.isNotBlank(old)) { + return; + } + Runnable scaleQueueTask = new Runnable() { private int alreadyRetryTimes = 0; @Override public void run() { + dedupMapForQueueScale.remove(dedupKey); TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); if (topicConfig != null) { synchronized (topicConfig) { @@ -118,7 +131,7 @@ public void run() { log.info("try adjust read queue size to {} for [{}], prev: {}, {}", adjustReadQueueSize, topic, topicConfig.getReadQueueNums(), mode); if (adjustReadQueueSize < topicConfig.getWriteQueueNums()) { log.info("adjust read queues to {} for [{}] fail. read queue size can't less than write queue size[{}]. {}", - adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode); + adjustReadQueueSize, topic, topicConfig.getWriteQueueNums(), mode); return; } boolean canAdjustReadQueueSize = isCanAdjustReadQueueSize(topic, adjustReadQueueSize); @@ -157,9 +170,18 @@ public void run() { } private void adjustWriteQueueNumByConsumerCount(String topic, long delayMills, AdjustType mode) { + String dedupKey = dedupKeyPrefixForQueueScale + "W$" + mode + "$" + topic; + String old = dedupMapForQueueScale.putIfAbsent(dedupKey,topic); + //若old不为空,表示该topic存在即将执行的任务,本次任务可以忽略 + //若不存在,表示该topic的任务正在执行,或者没有任务,本次不可忽略 + if(StringUtils.isNotBlank(old)) { + return; + } + Runnable scaleTask = new Runnable() { @Override public void run() { + dedupMapForQueueScale.remove(dedupKey); TopicConfig topicConfig = deFiBrokerController.getTopicConfigManager().getTopicConfigTable().get(topic); if (topicConfig != null) { synchronized (topicConfig) { @@ -186,7 +208,7 @@ public void run() { notifyWhenTopicConfigChange(topic); } else { log.info("adjust write queues to {} for [{}] fail. target write queue size can't less than 0 or greater than read queue size[{}]. mode: {}", - adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode); + adjustWriteQueueSize, topic, topicConfig.getReadQueueNums(), mode); } } else { log.info("no need to adjust write queue size for [{}]. now [w:{}/r:{}]. {}", topic, topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums(), mode); @@ -250,7 +272,7 @@ public boolean test(String cid) { String idc = cidArr[cidArr.length - 1]; String clusterName = deFiBrokerController.getBrokerConfig().getBrokerClusterName(); if (clusterName.toUpperCase().startsWith(idc) || - idc.startsWith(clusterName.toUpperCase())) { + idc.startsWith(clusterName.toUpperCase())) { return true; } } @@ -332,7 +354,7 @@ private boolean isAllMessageConsumed(String topic, Set groups, int queue long ackOffset = deFiBrokerController.getConsumeQueueManager().queryOffset(group, topic, queueId); if (ackOffset < maxOffset) { log.info("not finish consume message for topic: {} by group : {}, queueId: {}, ackOffset: {}, maxOffset: {}", - topic, group, queueId, ackOffset, maxOffset); + topic, group, queueId, ackOffset, maxOffset); return false; } } @@ -343,4 +365,8 @@ public enum AdjustType { INCREASE_QUEUE_NUM, DECREASE_QUEUE_NUM } + + public DeFiBrokerController getDeFiBrokerController() { + return deFiBrokerController; + } } diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerGroupInfo.java b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerGroupInfo.java index 5bc5b99..ff32315 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerGroupInfo.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerGroupInfo.java @@ -105,20 +105,6 @@ public Set unregisterClientId(final ClientChannelInfo clientChannelInfo) return whichTopic; } - @Override - public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { - try { - final ClientChannelInfo channelInfo = getChannelInfoTable().get(channel); - if (channelInfo != null) { - unregisterClientId(channelInfo); - } - super.doChannelCloseEvent(remoteAddr, channel); - return true; - } catch (Exception ex) { - log.warn("doChannelCloseEvent fail.", ex); - return false; - } - } public Set findSubscribedTopicByClientId(final String clientId) { Set result = new HashSet<>(); @@ -139,4 +125,8 @@ public Set getClientIdBySubscription(String topic) { } return new HashSet<>(); } + + public ConcurrentHashMap> getClientIdMap() { + return clientIdMap; + } } diff --git a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java index cecd755..19b5393 100644 --- a/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java +++ b/defibus-broker/src/main/java/com/webank/defibus/broker/client/DeFiConsumerManager.java @@ -17,13 +17,34 @@ package com.webank.defibus.broker.client; +import com.webank.defibus.common.DeFiBusBrokerConfig; import com.webank.defibus.common.util.ReflectUtil; +import io.netty.channel.Channel; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; + import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupEvent; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -34,14 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.netty.channel.Channel; - public class DeFiConsumerManager extends ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; @@ -49,13 +62,26 @@ public class DeFiConsumerManager extends ConsumerManager { new ConcurrentHashMap(1024); private final ConsumerIdsChangeListener consumerIdsChangeListener; private final AdjustQueueNumStrategy adjustQueueNumStrategy; + private final ExecutorService notifyClientExecutor; + private final BlockingQueue notifyClientThreadPoolQueue; + + private final ConcurrentHashMap dedupMapForNotifyClientChange = new ConcurrentHashMap<>(); + private final String dedupKeyPrefixForNotifyClientChange = "NCC$"; @SuppressWarnings("unchecked") public DeFiConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, - final AdjustQueueNumStrategy strategy) { + final AdjustQueueNumStrategy strategy, DeFiBusBrokerConfig deFiBusBrokerConfig) { super(consumerIdsChangeListener); this.consumerIdsChangeListener = consumerIdsChangeListener; this.adjustQueueNumStrategy = strategy; + this.notifyClientThreadPoolQueue = new LinkedBlockingQueue(deFiBusBrokerConfig.getNotifyClientThreadPoolQueueCapacity()); + this.notifyClientExecutor = new ThreadPoolExecutor( + deFiBusBrokerConfig.getNotifyClientThreadPoolNums(), + deFiBusBrokerConfig.getNotifyClientThreadPoolNums(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.notifyClientThreadPoolQueue, + new ThreadFactoryImpl("notifyClientThread_")); try { this.consumerTable = (ConcurrentHashMap) ReflectUtil.getSimpleProperty(ConsumerManager.class, this, "consumerTable"); @@ -83,7 +109,8 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie if (r1 || r2) { adjustQueueNum(oldSub, subList); if (isNotifyConsumerIdsChangedEnable) { - this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); +// this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); + asyncNotifyClientChange(group, deFiConsumerGroupInfo); } } @@ -116,18 +143,30 @@ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) while (it.hasNext()) { Map.Entry next = it.next(); ConsumerGroupInfo info = next.getValue(); + String group = next.getKey(); + DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) info; if (info.getChannelInfoTable().get(channel) != null) { ClientChannelInfo clientChannelInfo = info.getChannelInfoTable().get(channel); - DeFiConsumerGroupInfo deFiConsumerGroupInfo = (DeFiConsumerGroupInfo) info; - subscribeTopics = deFiConsumerGroupInfo.findSubscribedTopicByClientId(clientChannelInfo.getClientId()); - } - } - super.doChannelCloseEvent(remoteAddr, channel); - - if (subscribeTopics != null) { - for (String topic : subscribeTopics) { - adjustQueueNumStrategy.decreaseQueueNum(topic); + //移除clientId + subscribeTopics = deFiConsumerGroupInfo.unregisterClientId(clientChannelInfo); + //缩Q + if (subscribeTopics != null) { + for (String topic : subscribeTopics) { + adjustQueueNumStrategy.decreaseQueueNum(topic); + } + } + //移除channel + boolean removed = info.doChannelCloseEvent(remoteAddr, channel); + if (removed && info.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(group); + if (remove != null) { + log.info("unregister consumer ok, no connection in this group,remove consumer group from consumerTable, {}", + group); + } + } } + //异步通知clientId变化 + asyncNotifyClientChange(group, deFiConsumerGroupInfo); } } @@ -158,6 +197,8 @@ public void scanNotActiveChannel() { adjustQueueNumStrategy.decreaseQueueNum(topic); } } + //异步通知clientId变化 + asyncNotifyClientChange(group, consumerGroupInfo); } } @@ -167,9 +208,77 @@ public void scanNotActiveChannel() { group); it.remove(); } + scanDirtyClientIdByGroup(consumerGroupInfo); + } + + } + + private void asyncNotifyClientChange(final String group, DeFiConsumerGroupInfo deFiConsumerGroupInfo) { + try { + String dedupKey = dedupKeyPrefixForNotifyClientChange + group; + String old = dedupMapForNotifyClientChange.putIfAbsent(dedupKey, group); + if (StringUtils.isNotBlank(old)) { + return; + } + //如果 dedupMap 中存在该group, 则表明存在 还未被执行的通知,可忽略本次 + //如果 dedupMap 中不存在该group, 则表明要么存在正在执行的通知, 要么不存在即将被执行的通知, 需要将本次排队 + this.notifyClientExecutor.execute(() -> { + try { + dedupMapForNotifyClientChange.remove(dedupKey); + if (deFiConsumerGroupInfo == null) { + return; + } + consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, getNotifyClientChannel(deFiConsumerGroupInfo)); + } catch (Exception ex) { + } + }); + } catch (RejectedExecutionException re) { + log.info("async notify client discard, group: {}", group); } } + public List getNotifyClientChannel(DeFiConsumerGroupInfo deFiConsumerGroupInfo) { + long startTime = System.currentTimeMillis(); + List result = new ArrayList<>(); + Set notifyClientId = new HashSet<>(); + Iterator>> it = deFiConsumerGroupInfo.getClientIdMap().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> entry = it.next(); + TopicConfig topicConfig = this.adjustQueueNumStrategy.getDeFiBrokerController().getTopicConfigManager().selectTopicConfig(entry.getKey()); + if (topicConfig != null) { + notifyClientId.addAll(entry.getValue()); + } + } + Iterator> channelInfoIter = deFiConsumerGroupInfo.getChannelInfoTable().entrySet().iterator(); + while (channelInfoIter.hasNext()) { + Map.Entry entry = channelInfoIter.next(); + if (entry.getValue() != null && notifyClientId.contains(entry.getValue().getClientId())) { + result.add(entry.getKey()); + } + } + + List activeChannels = new ArrayList<>(); + for (Channel chl : result) { + if (chl.isActive()) { + activeChannels.add(chl); + } + } + + if (result.size() > 0) { + float activeRatio = (float) activeChannels.size() / result.size(); + if (activeRatio <= 0.5f) { + log.info("inactive channel in group[{}] too much, activeChannels[{}], totalChannel[{}]", + deFiConsumerGroupInfo.getGroupName(), activeChannels.size(), result.size()); + } + } + + long endTime = System.currentTimeMillis(); + if ((endTime - startTime) >= 5) { + log.info("getNotifyClientChannel too long, time {} ms", (endTime - startTime)); + } + return activeChannels; + } + private void adjustQueueNum(final Set oldSub, final Set subList) { for (SubscriptionData subscriptionData : subList) { if (!oldSub.contains(subscriptionData.getTopic())) { @@ -199,4 +308,42 @@ public void notifyWhenTopicConfigChange(String topic) { public ConcurrentHashMap getConsumerTable() { return this.consumerTable; } + + public ExecutorService getNotifyClientExecutor() { + return notifyClientExecutor; + } + + public BlockingQueue getNotifyClientThreadPoolQueue() { + return notifyClientThreadPoolQueue; + } + + public void scanDirtyClientIdByGroup(DeFiConsumerGroupInfo groupInfo) { + if (groupInfo != null) { + List allChannelClientId = groupInfo.getAllClientId(); + Iterator>> it = groupInfo.getClientIdMap().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry> next = it.next(); + Set topicCid = new HashSet<>(next.getValue()); + for (String cid : topicCid) { + if (!allChannelClientId.contains(cid)) { + //check again to avoid mistaken + allChannelClientId = groupInfo.getAllClientId(); + if (!allChannelClientId.contains(cid)) { + log.warn("SCAN DIRTY CLIENTID : {} in [{}] has no channel, maybe dirty. group: {} AllChannelClientId: {}", cid, next.getKey(), groupInfo.getGroupName(), allChannelClientId); + if (this.adjustQueueNumStrategy.getDeFiBrokerController().getDeFiBusBrokerConfig().isAutoCleanDirtyClientId()) { + boolean removed = next.getValue().remove(cid); + if (removed) { + log.info("remove dirty clientId {} from {} success. {}", cid, next.getKey(), groupInfo.getGroupName()); + this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, groupInfo.getGroupName(), getNotifyClientChannel(groupInfo)); + } else { + log.info("remove dirty clientId {} from {} fail. group: {} current cidList: {}", cid, next.getKey(), groupInfo.getGroupName(), next.getValue()); + } + } + } + } + } + } + } + } + } diff --git a/defibus-common/src/main/java/com/webank/defibus/common/DeFiBusBrokerConfig.java b/defibus-common/src/main/java/com/webank/defibus/common/DeFiBusBrokerConfig.java index ed904fa..99bb859 100644 --- a/defibus-common/src/main/java/com/webank/defibus/common/DeFiBusBrokerConfig.java +++ b/defibus-common/src/main/java/com/webank/defibus/common/DeFiBusBrokerConfig.java @@ -28,6 +28,10 @@ public class DeFiBusBrokerConfig { private int pushReplyThreadPoolQueueCapacity = 10000; + private int notifyClientThreadPoolQueueCapacity = 50000; + + private int notifyClientThreadPoolNums = Math.min(12, Runtime.getRuntime().availableProcessors()); + @ImportantField private String rmqAddressServerSubGroup = "namesrvAddr"; @@ -44,6 +48,8 @@ public class DeFiBusBrokerConfig { private boolean redirectMessageEnable = true; + private boolean autoCleanDirtyClientId = true; + // whether reject sending when the depth exceeds threshold @ImportantField private boolean rejectSendWhenMaxDepth = true; @@ -175,4 +181,28 @@ public boolean isCheckQueueListening() { public int getCheckQueueListeningPeriod() { return checkQueueListeningPeriod; } + + public int getNotifyClientThreadPoolNums() { + return notifyClientThreadPoolNums; + } + + public void setNotifyClientThreadPoolNums(int notifyClientThreadPoolNums) { + this.notifyClientThreadPoolNums = notifyClientThreadPoolNums; + } + + public int getNotifyClientThreadPoolQueueCapacity() { + return notifyClientThreadPoolQueueCapacity; + } + + public void setNotifyClientThreadPoolQueueCapacity(int notifyClientThreadPoolQueueCapacity) { + this.notifyClientThreadPoolQueueCapacity = notifyClientThreadPoolQueueCapacity; + } + + public boolean isAutoCleanDirtyClientId() { + return autoCleanDirtyClientId; + } + + public void setAutoCleanDirtyClientId(boolean autoCleanDirtyClientId) { + this.autoCleanDirtyClientId = autoCleanDirtyClientId; + } }