Skip to content

Commit

Permalink
[INLONG-8106][DataProxy] Optimize ConfigManager implementation ( part…
Browse files Browse the repository at this point in the history
… one ) (apache#8107)
  • Loading branch information
gosonzhang authored May 29, 2023
1 parent fbd46c0 commit 6e8d77a
Show file tree
Hide file tree
Showing 19 changed files with 916 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ public List<Channel> getOptionalChannels(Event event) {
*/
private List<String> splitChannelName(String channelName) {
List<String> fileMetricList = new ArrayList<String>();
if (StringUtils.isEmpty(channelName)) {
LOG.info("channel name is null!");
} else {
if (StringUtils.isNotBlank(channelName)) {
fileMetricList = Arrays.asList(channelName.split("\\s+"));
}
return fileMetricList;
Expand Down Expand Up @@ -145,11 +143,9 @@ public void configure(Context context) {
this.slaveChannels.add(channel);
}
}
LOG.info("masters:" + this.masterChannels);
LOG.info("orders:" + this.orderChannels);
LOG.info("slaves:" + this.slaveChannels);
LOG.info("transfers:" + this.transferChannels);
LOG.info("agentFileMetrics:" + this.agentFileMetricChannels);
LOG.info("slaMetrics:" + this.slaMetricChannels);
LOG.info(
"Configure channels, masters={}, orders={}, slaves={}, transfers={}, agentFileMetrics={}, slaMetrics={}",
this.masterChannels, this.orderChannels, this.slaveChannels,
this.transferChannels, this.agentFileMetricChannels, this.slaMetricChannels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void executeCallbacks() {
*
* @return - true if configure updated
*/
public abstract boolean loadFromFileToHolder();
protected abstract boolean loadFromFileToHolder();

/**
* check updater
Expand All @@ -92,7 +92,7 @@ public boolean checkAndUpdateHolder() {
if (configFile != null) {
this.lastModifyTime = configFile.lastModified();
}
LOG.info("File {} has changed, reload from local file agent", getFileName());
LOG.info("File {} has changed, reload from local file", this.fileName);
return loadFromFileToHolder();
}
return false;
Expand Down Expand Up @@ -125,8 +125,8 @@ private void setFilePath(String fileName) {
if (url != null) {
this.filePath = url.getPath();
this.configFile = new File(this.filePath);
LOG.info("set file path lastTime: {}, currentTime: {}",
lastModifyTime, configFile.lastModified());
LOG.info("Set {} file path, lastTime: {}, currentTime: {}",
fileName, lastModifyTime, configFile.lastModified());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.dataproxy.config.holder.BlackListConfigHolder;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.config.holder.GroupIdPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IPVisitConfigHolder;
import org.apache.inlong.dataproxy.config.holder.GroupIdNumConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MQClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.MxPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.PropertiesConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportConfigHolder;
import org.apache.inlong.dataproxy.config.holder.SourceReportInfo;
import org.apache.inlong.dataproxy.config.holder.WeightConfigHolder;
import org.apache.inlong.dataproxy.config.holder.WhiteListConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
Expand All @@ -52,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -65,15 +68,18 @@ public class ConfigManager {
public static final List<ConfigHolder> CONFIG_HOLDER_LIST = new ArrayList<>();
private static volatile boolean isInit = false;
private static ConfigManager instance = null;
// node weight configure
private final WeightConfigHolder weightConfigHolder = new WeightConfigHolder();
// black list configure
private final BlackListConfigHolder blacklistConfigHolder = new BlackListConfigHolder();
// whitelist configure
private final WhiteListConfigHolder whitelistConfigHolder = new WhiteListConfigHolder();
// group id num 2 name configure
private final GroupIdNumConfigHolder groupIdConfig = new GroupIdNumConfigHolder();

private final MQClusterConfigHolder mqClusterConfigHolder = new MQClusterConfigHolder("mq_cluster.properties");
private final PropertiesConfigHolder topicConfig = new PropertiesConfigHolder("topics.properties");
private final MxPropertiesHolder mxConfig = new MxPropertiesHolder("mx.properties");

private final GroupIdPropertiesHolder groupIdConfig = new GroupIdPropertiesHolder("groupid_mapping.properties");
private final PropertiesConfigHolder weightHolder = new PropertiesConfigHolder("weight.properties");
private final IPVisitConfigHolder blackListConfig = new IPVisitConfigHolder(true, "blacklist.properties");
private final IPVisitConfigHolder whiteListConfig = new IPVisitConfigHolder(false, "whitelist.properties");
// source report configure holder
private final SourceReportConfigHolder sourceReportConfigHolder = new SourceReportConfigHolder();
// mq clusters ready
Expand Down Expand Up @@ -101,8 +107,25 @@ public static ConfigManager getInstance() {
return instance;
}

public Map<String, String> getWeightProperties() {
return weightHolder.getHolder();
// get node weight configure
public double getCpuWeight() {
return weightConfigHolder.getCachedCpuWeight();
}

public double getNetInWeight() {
return weightConfigHolder.getCachedNetInWeight();
}

public double getNetOutWeight() {
return weightConfigHolder.getCachedNetOutWeight();
}

public double getTcpWeight() {
return weightConfigHolder.getCachedTcpWeight();
}

public double getCpuThresholdWeight() {
return weightConfigHolder.getCachedCpuThreshold();
}

/**
Expand Down Expand Up @@ -134,24 +157,59 @@ public boolean deleteTopicProperties(Map<String, String> result) {
return updatePropertiesHolder(result, topicConfig, false);
}

public Map<String, String> getMxProperties() {
return mxConfig.getHolder();
// get groupId num 2 name info
public boolean isEnableNum2NameTrans(String groupIdNum) {
return groupIdConfig.isEnableNum2NameTrans(groupIdNum);
}

public boolean isGroupIdNumConfigEmpty() {
return groupIdConfig.isGroupIdNumConfigEmpty();
}

public boolean isStreamIdNumConfigEmpty() {
return groupIdConfig.isStreamIdNumConfigEmpty();
}

public String getGroupIdNameByNum(String groupIdNum) {
return groupIdConfig.getGroupIdNameByNum(groupIdNum);
}

public String getStreamIdNameByIdNum(String groupIdNum, String streamIdNum) {
return groupIdConfig.getStreamIdNameByIdNum(groupIdNum, streamIdNum);
}

public ConcurrentHashMap<String, String> getGroupIdNumMap() {
return groupIdConfig.getGroupIdNumMap();
}

public ConcurrentHashMap<String, ConcurrentHashMap<String, String>> getStreamIdNumMap() {
return groupIdConfig.getStreamIdNumMap();
}

// get blacklist whitelist configure info
public void regIPVisitConfigChgCallback(ConfigUpdateCallback callback) {
blackListConfig.addUpdateCallback(callback);
whiteListConfig.addUpdateCallback(callback);
blacklistConfigHolder.addUpdateCallback(callback);
whitelistConfigHolder.addUpdateCallback(callback);
}

public boolean needChkIllegalIP() {
return (!blackListConfig.isEmptyConfig()
|| CommonConfigHolder.getInstance().isEnableWhiteList());
return (blacklistConfigHolder.needCheckBlacklist()
|| whitelistConfigHolder.needCheckWhitelist());
}

public boolean isIllegalIP(String strRemoteIP) {
return strRemoteIP == null
|| blackListConfig.isContain(strRemoteIP)
|| (CommonConfigHolder.getInstance().isEnableWhiteList() && !whiteListConfig.isContain(strRemoteIP));
|| blacklistConfigHolder.isIllegalIP(strRemoteIP)
|| whitelistConfigHolder.isIllegalIP(strRemoteIP);
}

// get mx configure info
public Map<String, Map<String, String>> getMxPropertiesMaps() {
return mxConfig.getMxPropertiesMaps();
}

public Map<String, String> getMxProperties() {
return mxConfig.getHolder();
}

public boolean addMxProperties(Map<String, String> result) {
Expand Down Expand Up @@ -260,22 +318,6 @@ private boolean updatePropertiesHolder(Map<String, String> result,
}
}

public Map<String, Map<String, String>> getMxPropertiesMaps() {
return mxConfig.getMxPropertiesMaps();
}

public Map<String, String> getGroupIdMappingProperties() {
return groupIdConfig.getGroupIdMappingProperties();
}

public Map<String, Map<String, String>> getStreamIdMappingProperties() {
return groupIdConfig.getStreamIdMappingProperties();
}

public Map<String, String> getGroupIdEnableMappingProperties() {
return groupIdConfig.getGroupIdEnableMappingProperties();
}

public PropertiesConfigHolder getTopicConfig() {
return topicConfig;
}
Expand Down Expand Up @@ -395,11 +437,17 @@ private boolean checkWithManager(String host, String clusterName, String cluster
}

for (DataProxyTopicInfo topic : configJson.getData().getTopicList()) {
if (topic == null
|| !topic.isValid()
|| StringUtils.isBlank(topic.getInlongGroupId())
|| StringUtils.isBlank(topic.getTopic())) {
continue;
}
if (!StringUtils.isEmpty(topic.getM())) {
groupIdToMValue.put(topic.getInlongGroupId(), topic.getM());
}
if (!StringUtils.isEmpty(topic.getTopic())) {
groupIdToTopic.put(topic.getInlongGroupId(), topic.getTopic());
groupIdToTopic.put(topic.getInlongGroupId().trim(), topic.getTopic().trim());
}
}
configManager.updateMxProperties(groupIdToMValue);
Expand Down
Loading

0 comments on commit 6e8d77a

Please sign in to comment.