From 236887de46675abd5c6953a7dc4d04039bc9432a Mon Sep 17 00:00:00 2001 From: WujieRen Date: Sun, 2 Apr 2023 15:29:47 +0800 Subject: [PATCH] [#218][feat] Auto-start services when worker restart. --- .../api/master/WorkerStartActor.java | 49 +++++++++-- .../service/ClusterServiceCommandService.java | 3 + .../ClusterServiceRoleInstanceService.java | 2 + .../ClusterServiceCommandServiceImpl.java | 87 ++++++++++--------- ...ClusterServiceRoleInstanceServiceImpl.java | 9 ++ 5 files changed, 100 insertions(+), 50 deletions(-) diff --git a/datasophon-api/src/main/java/com/datasophon/api/master/WorkerStartActor.java b/datasophon-api/src/main/java/com/datasophon/api/master/WorkerStartActor.java index a8a7d741..b073fe31 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/master/WorkerStartActor.java +++ b/datasophon-api/src/main/java/com/datasophon/api/master/WorkerStartActor.java @@ -33,17 +33,18 @@ import com.datasophon.common.enums.InstallState; import com.datasophon.common.model.HostInfo; import com.datasophon.common.model.StartWorkerMessage; +import com.datasophon.common.utils.CollectionUtils; +import com.datasophon.common.utils.Result; import com.datasophon.dao.entity.ClusterHostEntity; import com.datasophon.dao.entity.ClusterInfoEntity; import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity; import com.datasophon.dao.enums.MANAGED; -import com.datasophon.dao.enums.ServiceRoleState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; + +import java.util.*; + +import static java.util.stream.Collectors.*; public class WorkerStartActor extends UntypedActor { @@ -81,14 +82,46 @@ public void onReceive(Object message) throws Throwable { hostEntity.setManaged(MANAGED.YES); clusterHostService.updateById(hostEntity); } - //tell to worker what need to start - //add to prometheus - ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class,ActorUtils.getActorRefName(PrometheusActor.class)); + ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class, ActorUtils.getActorRefName(PrometheusActor.class)); GenerateHostPrometheusConfig prometheusConfigCommand = new GenerateHostPrometheusConfig(); prometheusConfigCommand.setClusterId(cluster.getId()); prometheusActor.tell(prometheusConfigCommand, getSelf()); + + //tell to worker what need to start + autoStartServiceNeeded(msg.getHostname(), cluster.getId()); } } + + /** + * Automatically start services that need to be started + * + * @param clusterId + */ + private void autoStartServiceNeeded(String hostname, Integer clusterId) { + ClusterServiceRoleInstanceService roleInstanceService = SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class); + ClusterServiceCommandService serviceCommandService = SpringTool.getApplicationContext().getBean(ClusterServiceCommandService.class); + + List serviceRoleList = roleInstanceService.getStoppedServiceRoleListByHostnameAndClusterId(hostname, clusterId); + if (CollectionUtils.isEmpty(serviceRoleList)) { + logger.info("No services need to start at host {}.", hostname); + return; + } + + Map> serviceRoleMap = serviceRoleList.stream() + .collect( + groupingBy( + ClusterServiceRoleInstanceEntity::getServiceId, + mapping(i -> String.valueOf(i.getId()), toList()) + ) + ); + Result result = serviceCommandService.generateServiceRoleCommands(clusterId, CommandType.START_SERVICE, serviceRoleMap); + if (result.getCode() == 200) { + logger.info("Auto-start services successful"); + } else { + logger.info("Some service auto-start failed, please check logs of the services that failed to start."); + } + } + } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceCommandService.java b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceCommandService.java index ada7946d..d0a6c50c 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceCommandService.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceCommandService.java @@ -25,6 +25,7 @@ import java.util.List; +import java.util.Map; /** * 集群服务操作指令表 @@ -41,6 +42,8 @@ public interface ClusterServiceCommandService extends IService ids); + Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map> instanceIdMap); + Result generateServiceRoleCommand(Integer clusterId, CommandType command, Integer serviceIntanceId, List ids); void startExecuteCommand(Integer clusterId, String commandType, String commandIds); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java index 808c424b..9a15964d 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/ClusterServiceRoleInstanceService.java @@ -34,6 +34,8 @@ */ public interface ClusterServiceRoleInstanceService extends IService { + List getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId); + List getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId); List getServiceRoleInstanceListByServiceIdAndRoleState(Integer id, ServiceRoleState stop); diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceCommandServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceCommandServiceImpl.java index b87d32d5..666752f5 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceCommandServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceCommandServiceImpl.java @@ -87,7 +87,7 @@ public class ClusterServiceCommandServiceImpl extends ServiceImpl serviceNames) { + public Result generateCommand(Integer clusterId, CommandType commandType, List serviceNames) { ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId); List list = new ArrayList<>(); @@ -95,13 +95,13 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List hostCommandList = new ArrayList<>(); List commandIds = new ArrayList(); - Map> serviceRoleHostMap = (Map>) CacheUtils.get(clusterInfo.getClusterCode() +Constants.UNDERLINE+ Constants.SERVICE_ROLE_HOST_MAPPING); + Map> serviceRoleHostMap = (Map>) CacheUtils.get(clusterInfo.getClusterCode() + Constants.UNDERLINE + Constants.SERVICE_ROLE_HOST_MAPPING); - for (String serviceName : serviceNames) { + for (String serviceName : serviceNames) { //1、生成操作指令 ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(clusterId, serviceName); - ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId,commandType,serviceName); + ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId, commandType, serviceName); commandEntity.setServiceInstanceId(serviceInstance.getId()); list.add(commandEntity); String commandId = commandEntity.getCommandId(); @@ -113,22 +113,22 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List serviceRoleList = (List) result.getData(); HashMap map = new HashMap<>(); for (FrameServiceRoleEntity serviceRole : serviceRoleList) { - if(Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())){ + if (Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())) { List hosts = serviceRoleHostMap.get(serviceRole.getServiceRoleName()); for (String hostname : hosts) { - if(alreadyExistsServiceRole(serviceRole.getServiceRoleName(),hostname,clusterId)){ + if (alreadyExistsServiceRole(serviceRole.getServiceRoleName(), hostname, clusterId)) { continue; - }else{ + } else { ClusterServiceCommandHostEntity commandHost; - if(map.containsKey(hostname)){ + if (map.containsKey(hostname)) { commandHost = map.get(hostname); - }else{ + } else { commandHost = ProcessUtils.generateCommandHostEntity(commandId, hostname); commandHostList.add(commandHost); - map.put(hostname,commandHost); + map.put(hostname, commandHost); } //4、生成主机操作指令 - ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(),serviceRole.getServiceRoleType(), commandHost); + ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(), serviceRole.getServiceRoleType(), commandHost); hostCommandList.add(hostCommand); } } @@ -138,12 +138,12 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List list = this.list(new QueryWrapper() - .orderByDesc(Constants.CREATE_TIME) - .last("limit " + offset + "," + pageSize)); + List list = this.list(new QueryWrapper().orderByDesc(Constants.CREATE_TIME).last("limit " + offset + "," + pageSize)); Integer total = this.count(); for (ClusterServiceCommandEntity commandEntity : list) { commandEntity.setCommandStateCode(commandEntity.getCommandState().getValue()); Date createTime = commandEntity.getCreateTime(); Date endTime = commandEntity.getEndTime(); - if(Objects.isNull(endTime)){ + if (Objects.isNull(endTime)) { endTime = new Date(); } long between = DateUtil.between(createTime, endTime, DateUnit.MS); @@ -175,6 +173,7 @@ public Result getServiceCommandlist(Integer clusterId, Integer page, Integer pag * 1、生成指令 * 2、生成主机指令 * 3、生产主机上操作指令 + * * @param clusterId * @param commandType * @param serviceInstanceIds @@ -190,7 +189,7 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType, int id = Integer.parseInt(serviceInstanceId); //查询服务对应的服务角色实例 List roleInstanceList = roleInstanceService.getServiceRoleInstanceListByServiceId(id); - if(Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0){ + if (Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0) { continue; } ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getById(id); @@ -203,31 +202,40 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType, HashMap map = new HashMap<>(); for (ClusterServiceRoleInstanceEntity roleInstance : roleInstanceList) { ClusterServiceCommandHostEntity commandHost; - if(map.containsKey(roleInstance.getHostname())){ + if (map.containsKey(roleInstance.getHostname())) { commandHost = map.get(roleInstance.getHostname()); - }else { + } else { commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname()); commandHostList.add(commandHost); } - ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost); + ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost); hostCommandList.add(hostCommand); - map.put(roleInstance.getHostname(),commandHost); + map.put(roleInstance.getHostname(), commandHost); } } - if(list.size() > 0){ + if (list.size() > 0) { commandService.saveBatch(list); commandHostService.saveBatch(commandHostList); hostCommandService.saveBatch(hostCommandList); //通知commandActor执行命令 - ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class)); - dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender()); + ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class)); + dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender()); } - return Result.success(String.join(",",commandIds)); + return Result.success(String.join(",", commandIds)); } @Override - public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId,List serviceRoleInstanceIds) { + public Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map> instanceIdMap) { + Result result = null; + for (Map.Entry> entry : instanceIdMap.entrySet()) { + result = generateServiceRoleCommand(clusterId, commandType, entry.getKey(), entry.getValue()); + } + return result; + } + + @Override + public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId, List serviceRoleInstanceIds) { List list = new ArrayList<>(); List commandHostList = new ArrayList<>(); List hostCommandList = new ArrayList<>(); @@ -246,24 +254,24 @@ public Result generateServiceRoleCommand(Integer clusterId, CommandType commandT ClusterServiceRoleInstanceEntity roleInstance = roleInstanceService.getById(id); ClusterServiceCommandHostEntity commandHost; - if(map.containsKey(roleInstance.getHostname())){ + if (map.containsKey(roleInstance.getHostname())) { commandHost = map.get(roleInstance.getHostname()); - }else { + } else { commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname()); commandHostList.add(commandHost); } - ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost); + ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost); hostCommandList.add(hostCommand); - map.put(roleInstance.getHostname(),commandHost); + map.put(roleInstance.getHostname(), commandHost); } commandService.saveBatch(list); commandHostService.saveBatch(commandHostList); hostCommandService.saveBatch(hostCommandList); //通知commandActor执行命令 - ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class)); - dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender()); - return Result.success(String.join(",",commandIds)); + ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class)); + dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender()); + return Result.success(String.join(",", commandIds)); } @Override @@ -271,8 +279,8 @@ public void startExecuteCommand(Integer clusterId, String commandType, String co List list = Arrays.asList(commandIds.split(",")); CommandType command = EnumUtil.fromString(CommandType.class, commandType); //通知commandActor执行命令 - ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class)); - dagBuildActor.tell(new StartExecuteCommandCommand(list,clusterId, command),ActorRef.noSender()); + ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class)); + dagBuildActor.tell(new StartExecuteCommandCommand(list, clusterId, command), ActorRef.noSender()); } @Override @@ -283,11 +291,6 @@ public void cancelCommand(String commandId) { @Override public ClusterServiceCommandEntity getLastRestartCommand(Integer serviceInstanceId) { - return this.getOne(new QueryWrapper() - .eq(Constants.SERVICE_INSTANCE_ID,serviceInstanceId) - .eq(Constants.COMMAND_TYPE,CommandType.RESTART_SERVICE.getValue()) - .or() - .eq(Constants.COMMAND_TYPE,CommandType.INSTALL_SERVICE.getValue()) - .orderByDesc(Constants.CREATE_TIME).last("limit 1")); + return this.getOne(new QueryWrapper().eq(Constants.SERVICE_INSTANCE_ID, serviceInstanceId).eq(Constants.COMMAND_TYPE, CommandType.RESTART_SERVICE.getValue()).or().eq(Constants.COMMAND_TYPE, CommandType.INSTALL_SERVICE.getValue()).orderByDesc(Constants.CREATE_TIME).last("limit 1")); } } diff --git a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java index 7f92fa3d..50beafb0 100644 --- a/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java +++ b/datasophon-api/src/main/java/com/datasophon/api/service/impl/ClusterServiceRoleInstanceServiceImpl.java @@ -86,6 +86,15 @@ public class ClusterServiceRoleInstanceServiceImpl extends ServiceImpl getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) { + return this.lambdaQuery() + .eq(ClusterServiceRoleInstanceEntity::getClusterId, clusterId) + .eq(ClusterServiceRoleInstanceEntity::getHostname, hostname) + .eq(ClusterServiceRoleInstanceEntity::getServiceRoleState, ServiceRoleState.STOP) + .list(); + } + @Override public List getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) { return this.list(new QueryWrapper()