Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update pull config compatible with different keys #404

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private LogDataDTO hit2DTO(SearchHit hit, List<String> keyList) {
LogDataDTO logData = new LogDataDTO();
Map<String, Object> ferry = hit.getSourceAsMap();
long time = 0;
if (ferry.containsKey("time")) {
if (ferry.containsKey("time") && null != ferry.get("time") && StringUtils.isNotBlank(ferry.get("time").toString())) {
time = DateUtil.parse(ferry.get("time").toString()).toTimestamp().getTime();
}
if (null == ferry.get(LogParser.esKeyMap_timestamp)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private LogDataDTO hit2DTO(SearchHit hit, List<String> keyList) {
Map<String, Object> ferry = hit.getSourceAsMap();

long time = 0;
if (ferry.containsKey("time")) {
if (ferry.containsKey("time") && null != ferry.get("time") && StringUtils.isNotBlank(ferry.get("time").toString())) {
time = DateUtil.parse(ferry.get("time").toString()).toTimestamp().getTime();
}
if (null == ferry.get(LogParser.esKeyMap_timestamp)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,16 @@ public void init() {
@Override
public void publishStreamConfig(Long spaceId, Integer type, Integer projectTypeCode, String motorRoomEn) {
//1.Query all stream machine IPs - real-time query
List<String> mioneStreamIpList = tailExtensionService.fetchStreamUniqueKeyList(fetchStreamMachineService, spaceId, motorRoomEn);
log.info("Query the list of machines in log-stream:{}", new Gson().toJson(mioneStreamIpList));
List<String> streamIpList = tailExtensionService.fetchStreamUniqueKeyList(fetchStreamMachineService, spaceId, motorRoomEn);
log.info("Query the list of machines in log-stream:{}", new Gson().toJson(streamIpList));
//2.send msg
streamConfigNacosPublisher.publish(spaceId, dealStreamConfigByRule(mioneStreamIpList, spaceId, type));
streamConfigNacosPublisher.publish(spaceId, dealStreamConfigByRule(streamIpList, spaceId, type));
tailExtensionService.publishStreamConfigPostProcess(streamConfigNacosPublisher, spaceId, motorRoomEn);
}

private synchronized MiLogStreamConfig dealStreamConfigByRule(List<String> ipList, Long spaceId, Integer type) {
MiLogStreamConfig existConfig = streamConfigNacosProvider.getConfig(spaceId);
ipList = ensureDefaultCompatibility(existConfig, ipList);
// New configuration
String spaceKey = CommonExtensionServiceFactory.getCommonExtensionService().getLogManagePrefix() + TAIL_CONFIG_DATA_ID + spaceId;
if (null == existConfig || OperateEnum.ADD_OPERATE.getCode().equals(type) || OperateEnum.UPDATE_OPERATE.getCode().equals(type)) {
Expand Down Expand Up @@ -178,8 +179,9 @@ private synchronized MiLogStreamConfig dealStreamConfigByRule(List<String> ipLis
// The number of name spaces held per machine
Map<String, Integer> ipSizeMap = config.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, stringMapEntry -> stringMapEntry.getValue().size()));
List<String> finalIpList = ipList;
String key = ipSizeMap.entrySet().stream()
.filter(entry -> ipList.contains(entry.getKey()))
.filter(entry -> finalIpList.contains(entry.getKey()))
.min(Map.Entry.comparingByValue()).get().getKey();
config.get(key).put(spaceId, spaceKey);
}
Expand All @@ -195,6 +197,21 @@ private synchronized MiLogStreamConfig dealStreamConfigByRule(List<String> ipLis
return existConfig;
}

/**
* compatible When the queried IP is different from the actual one, the actual one is returned
* @param existConfig
* @param ipList
* @return
*/
private List<String> ensureDefaultCompatibility(MiLogStreamConfig existConfig, List<String> ipList) {
Set<String> keySet = existConfig.getConfig().keySet();
if (!CollectionUtils.isEqualCollection(keySet, ipList)) {
log.info("ipList not belong to config,query list:{},actual list:{}", GSON.toJson(ipList), GSON.toJson(keySet));
ipList = keySet.stream().toList();
}
return ipList;
}

@Override
public void publishNameSpaceConfig(String motorRoomEn, Long spaceId, Long storeId, Long tailId, Integer type, String changeType) {
Assert.notNull(spaceId, "logSpaceId not empty");
Expand Down
Loading