From 01696d18a4fac3530e01c8e22cc82fe4f559c7b9 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Fri, 10 May 2024 10:34:16 +0800 Subject: [PATCH 1/3] refactor: fix manager query conunt and agent print log --- .../xiaomi/mone/test/SpringBootFeatTest.java | 68 +++++++++++++ .../java/com/xiaomi/mone/test/UserBean.java | 24 +++++ .../log/agent/channel/ChannelServiceImpl.java | 24 ++++- .../manager/dao/MilogAppMiddlewareRelDao.java | 4 +- .../mone/log/manager/dao/MilogLogTailDao.java | 6 +- .../mone/log/stream/plugin/es/EsPlugin.java | 96 ++++++++++--------- 6 files changed, 169 insertions(+), 53 deletions(-) create mode 100644 ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java create mode 100644 ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java diff --git a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java new file mode 100644 index 00000000..8f6a12b3 --- /dev/null +++ b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java @@ -0,0 +1,68 @@ +package com.xiaomi.mone.test; + +import com.xiaomi.mone.app.AppBootstrap; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import javax.annotation.Resource; +import javax.validation.ConstraintViolation; +import javax.validation.Validation; +import javax.validation.Validator; +import javax.validation.ValidatorFactory; +import java.util.Set; + +/** + * + * @description + * @version 1.0 + * @author wtt + * @date 2024/4/28 10:02 + * + */ +//@SpringBootTest(classes = AppBootstrap.class) +public class SpringBootFeatTest { + + @Resource + private Validator validator; + + @Test + public void validatorSpringTest() { + validator(validator); + } + + private void validator(Validator validator) { + + UserBean userBean = new UserBean(); + userBean.setUserName(""); + userBean.setAge(17); + + Set> constraintViolations = validator.validate(userBean); + System.out.println("validate 校验对象属性:"); + System.out.println(constraintViolations); + System.out.println(); + + constraintViolations = validator.validateProperty(userBean, "age"); + System.out.println("validate 校验对象属性age:"); + System.out.println(constraintViolations); + System.out.println(); + + constraintViolations = validator.validateValue(UserBean.class, "age", 15); + System.out.println("validate 校验对象属性age:"); + System.out.println(constraintViolations); + } + + @Test + public void validatorJavaTest() { + Validator validator1 = ValidationUtils.getValidator(); + validator(validator1); + } + + public static class ValidationUtils { + + public static Validator getValidator() { + ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); + return validatorFactory.getValidator(); + } + } + +} diff --git a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java new file mode 100644 index 00000000..3490fecd --- /dev/null +++ b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java @@ -0,0 +1,24 @@ +package com.xiaomi.mone.test; + +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; + +/** + * + * @description + * @version 1.0 + * @author wtt + * @date 2024/4/28 10:10 + * + */ +@Data +public class UserBean { + + @NotBlank + private String userName; + + @Min(value = 18, message = "age最小值为18") + private Integer age; +} diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java index 65691bdd..01e36bd9 100644 --- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java +++ b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java @@ -382,9 +382,8 @@ private void readFile(String patternCode, String ip, String filePath, Long chann ReadListener listener = initFileReadListener(mLog, patternCode, usedIp, filePath); Map fileProgressMap = channelMemory.getFileProgressMap(); - if (!collectOnce) { - log.info("fileProgressMap:{}", gson.toJson(fileProgressMap)); - } + printMapToJson(fileProgressMap, collectOnce); + ILogFile logFile = getLogFile(filePath, listener, fileProgressMap); if (null == logFile) { log.warn("file:{} marked stop to collect", filePath); @@ -420,6 +419,25 @@ private void stopOldCurrentFileThread(String filePath) { } } + private void printMapToJson(Map map, boolean collectOnce) { + if (map == null || map.isEmpty()) { + return; + } + + Map snapshot; + try { + snapshot = new HashMap<>(map); + } catch (ConcurrentModificationException e) { + log.error("Failed to create snapshot of fileProgressMap", e); + return; + } + + if (!collectOnce && !snapshot.isEmpty()) { + String jsonMap = gson.toJson(snapshot); + log.info("fileProgressMap: {}", jsonMap); + } + } + private ILogFile getLogFile(String filePath, ReadListener listener, Map fileProgressMap) { diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogAppMiddlewareRelDao.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogAppMiddlewareRelDao.java index 34d5a9eb..11967f65 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogAppMiddlewareRelDao.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogAppMiddlewareRelDao.java @@ -136,8 +136,8 @@ public List getAppRelByLimit(int offset, int rows) { } public Integer queryCountByTopicName(String topicName) { - Sql sql = Sqls.queryRecord("SELECT count(1) as count FROM `milog_app_middleware_rel` where config like '%' || @topicName || '%'"); - sql.params().set("topicName", topicName); + Sql sql = Sqls.queryRecord("SELECT count(1) as count FROM `milog_app_middleware_rel` where config like @topicName "); + sql.params().set("topicName", "%" + topicName + "%"); LinkedList records = (LinkedList) dao.execute(sql).getResult(); int access = records.get(0).getInt("count"); return access; diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogLogTailDao.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogLogTailDao.java index 2daac45e..be9bbd1e 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogLogTailDao.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/dao/MilogLogTailDao.java @@ -280,12 +280,10 @@ public List queryStoreIdByRegionNameEN(String nameEn) { } public List queryTailNameExists(String tailName, String machineRoom, Long spaceId) { - Sql sql = Sqls.queryEntity("SELECT la.* FROM milog_logstail la LEFT JOIN milog_logstore lt ON la.store_id = lt.id WHERE la.tail = @tailName AND lt.machine_room = @machineRoom"); + Sql sql = Sqls.queryEntity("SELECT la.* FROM milog_logstail la LEFT JOIN milog_logstore lt ON la.store_id = lt.id WHERE la.tail = @tailName AND lt.machine_room = @machineRoom and la.space_id = @spaceId"); sql.params().set("tailName", tailName); sql.params().set("machineRoom", machineRoom); - if (null != spaceId) { - sql.params().set("store_id", spaceId); - } + sql.params().set("spaceId", spaceId); sql.setEntity(dao.getEntity(MilogLogTailDo.class)); dao.execute(sql); return sql.getList(MilogLogTailDo.class); diff --git a/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java b/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java index d9b0b8db..21bd372e 100644 --- a/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java +++ b/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java @@ -139,26 +139,30 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - log.debug("success send to es,desc:{}", request.getDescription()); - AtomicInteger count = new AtomicInteger(); - response.spliterator().forEachRemaining(x -> { - if (x.isFailed()) { - BulkItemResponse.Failure failure = x.getFailure(); - String msg = String.format( - "Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s" - , x.getIndex() - , x.getType() - , x.getId() - , x.getItemId() - , x.getOpType().getLowercase() - , x.getVersion() - , failure.getCause().getMessage() - ); - log.error("Bulk executionId:[{}] has error messages:\t{}", executionId, msg); - count.incrementAndGet(); - } - }); - log.debug("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", executionId, request.numberOfActions(), count.intValue()); + if (response.hasFailures()) { + AtomicInteger count = new AtomicInteger(); + response.spliterator().forEachRemaining(x -> { + if (x.isFailed()) { + BulkItemResponse.Failure failure = x.getFailure(); + String msg = String.format( + "Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s" + , x.getIndex() + , x.getType() + , x.getId() + , x.getItemId() + , x.getOpType().getLowercase() + , x.getVersion() + , failure.getCause().getMessage() + ); + log.error("Bulk executionId:[{}] has error messages:{}", executionId, msg); + count.incrementAndGet(); + } + }); + log.debug("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", executionId, request.numberOfActions(), count.intValue()); + sendMessageToTopic(request, esInfo, onFailedConsumer); + } else { + log.debug("success send to es,desc:{}", request.getDescription()); + } } @Override @@ -172,35 +176,39 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) , clazz.getTypeName() , clazz.getCanonicalName() , failure.getMessage()); - MqMessageDTO MqMessageDTO = new MqMessageDTO(); - MqMessageDTO.setEsInfo(esInfo); - List compensateMqDTOS = Lists.newArrayList(); - request.requests().stream().filter(x -> x instanceof IndexRequest) - .forEach(x -> { - Map source = ((IndexRequest) x).sourceAsMap(); - log.error("Failure to handle index:[{}], type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source)); - MqMessageDTO.CompensateMqDTO compensateMqDTO = new MqMessageDTO.CompensateMqDTO(); - compensateMqDTO.setMsg(JSON.toJSONString(source)); - compensateMqDTO.setEsIndex(x.index()); - compensateMqDTOS.add(compensateMqDTO); - }); - //The message is sent to mq for consumption - the data cannot be larger than 10M, otherwise it cannot be written, divided into 2 parts - int length = JSON.toJSONString(compensateMqDTOS).getBytes().length; - if (length > SINGLE_MESSAGE_BYTES_MAXIMAL) { - List> splitList = ListUtil.partition(compensateMqDTOS, 2); - for (List mqDTOS : splitList) { - MqMessageDTO.setCompensateMqDTOS(mqDTOS); - onFailedConsumer.accept(MqMessageDTO); - } - } else { - MqMessageDTO.setCompensateMqDTOS(compensateMqDTOS); - onFailedConsumer.accept(MqMessageDTO); - } + sendMessageToTopic(request, esInfo, onFailedConsumer); } })); return esProcessor; } + private static void sendMessageToTopic(BulkRequest request, StorageInfo esInfo, Consumer onFailedConsumer) { + MqMessageDTO MqMessageDTO = new MqMessageDTO(); + MqMessageDTO.setEsInfo(esInfo); + List compensateMqDTOS = Lists.newArrayList(); + request.requests().stream().filter(x -> x instanceof IndexRequest) + .forEach(x -> { + Map source = ((IndexRequest) x).sourceAsMap(); + log.error("Failure to handle index:[{}], type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source)); + MqMessageDTO.CompensateMqDTO compensateMqDTO = new MqMessageDTO.CompensateMqDTO(); + compensateMqDTO.setMsg(JSON.toJSONString(source)); + compensateMqDTO.setEsIndex(x.index()); + compensateMqDTOS.add(compensateMqDTO); + }); + //The message is sent to mq for consumption - the data cannot be larger than 10M, otherwise it cannot be written, divided into 2 parts + int length = JSON.toJSONString(compensateMqDTOS).getBytes().length; + if (length > SINGLE_MESSAGE_BYTES_MAXIMAL) { + List> splitList = ListUtil.partition(compensateMqDTOS, 2); + for (List mqDTOS : splitList) { + MqMessageDTO.setCompensateMqDTOS(mqDTOS); + onFailedConsumer.accept(MqMessageDTO); + } + } else { + MqMessageDTO.setCompensateMqDTOS(compensateMqDTOS); + onFailedConsumer.accept(MqMessageDTO); + } + } + private static String cacheKey(StorageInfo esInfo) { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(esInfo.getId()).append(","); From 61819bc45be8a7097ee0e349659c1d2bebe6e888 Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Fri, 10 May 2024 10:39:55 +0800 Subject: [PATCH 2/3] refactor: update comment --- .../xiaomi/mone/test/SpringBootFeatTest.java | 17 ++++++++++++++++- .../java/com/xiaomi/mone/test/UserBean.java | 15 +++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java index 8f6a12b3..a4b41ae2 100644 --- a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java +++ b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Xiaomi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.xiaomi.mone.test; import com.xiaomi.mone.app.AppBootstrap; @@ -19,7 +34,7 @@ * @date 2024/4/28 10:02 * */ -//@SpringBootTest(classes = AppBootstrap.class) +@SpringBootTest(classes = AppBootstrap.class) public class SpringBootFeatTest { @Resource diff --git a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java index 3490fecd..20ef35ac 100644 --- a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java +++ b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/UserBean.java @@ -1,3 +1,18 @@ +/* + * Copyright 2020 Xiaomi + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.xiaomi.mone.test; import lombok.Data; From 3f54591fd25ed2d5644ca8e52190fb3807f0d1fc Mon Sep 17 00:00:00 2001 From: wtt <1136220284@qq.com> Date: Fri, 10 May 2024 10:42:55 +0800 Subject: [PATCH 3/3] refactor: update --- .../test/java/com/xiaomi/mone/test/SpringBootFeatTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java index a4b41ae2..2ad51701 100644 --- a/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java +++ b/ozhera-app/app-server/src/test/java/com/xiaomi/mone/test/SpringBootFeatTest.java @@ -34,7 +34,7 @@ * @date 2024/4/28 10:02 * */ -@SpringBootTest(classes = AppBootstrap.class) +//@SpringBootTest(classes = AppBootstrap.class) public class SpringBootFeatTest { @Resource @@ -42,7 +42,7 @@ public class SpringBootFeatTest { @Test public void validatorSpringTest() { - validator(validator); +// validator(validator); } private void validator(Validator validator) {