Skip to content

Commit

Permalink
[#218][feat] Auto-start services when worker restart. (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
WujieRen authored Apr 3, 2023
1 parent 0275fd3 commit ae038a3
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
import com.datasophon.common.enums.InstallState;
import com.datasophon.common.model.HostInfo;
import com.datasophon.common.model.StartWorkerMessage;
import com.datasophon.common.utils.CollectionUtils;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.ClusterHostEntity;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;
import com.datasophon.dao.enums.MANAGED;
import com.datasophon.dao.enums.ServiceRoleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import java.util.*;

import static java.util.stream.Collectors.*;

public class WorkerStartActor extends UntypedActor {

Expand Down Expand Up @@ -81,14 +82,46 @@ public void onReceive(Object message) throws Throwable {
hostEntity.setManaged(MANAGED.YES);
clusterHostService.updateById(hostEntity);
}
//tell to worker what need to start


//add to prometheus
ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class,ActorUtils.getActorRefName(PrometheusActor.class));
ActorRef prometheusActor = ActorUtils.getLocalActor(PrometheusActor.class, ActorUtils.getActorRefName(PrometheusActor.class));
GenerateHostPrometheusConfig prometheusConfigCommand = new GenerateHostPrometheusConfig();
prometheusConfigCommand.setClusterId(cluster.getId());
prometheusActor.tell(prometheusConfigCommand, getSelf());

//tell to worker what need to start
autoStartServiceNeeded(msg.getHostname(), cluster.getId());
}
}

/**
* Automatically start services that need to be started
*
* @param clusterId
*/
private void autoStartServiceNeeded(String hostname, Integer clusterId) {
ClusterServiceRoleInstanceService roleInstanceService = SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class);
ClusterServiceCommandService serviceCommandService = SpringTool.getApplicationContext().getBean(ClusterServiceCommandService.class);

List<ClusterServiceRoleInstanceEntity> serviceRoleList = roleInstanceService.getStoppedServiceRoleListByHostnameAndClusterId(hostname, clusterId);
if (CollectionUtils.isEmpty(serviceRoleList)) {
logger.info("No services need to start at host {}.", hostname);
return;
}

Map<Integer, List<String>> serviceRoleMap = serviceRoleList.stream()
.collect(
groupingBy(
ClusterServiceRoleInstanceEntity::getServiceId,
mapping(i -> String.valueOf(i.getId()), toList())
)
);
Result result = serviceCommandService.generateServiceRoleCommands(clusterId, CommandType.START_SERVICE, serviceRoleMap);
if (result.getCode() == 200) {
logger.info("Auto-start services successful");
} else {
logger.info("Some service auto-start failed, please check logs of the services that failed to start.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


import java.util.List;
import java.util.Map;

/**
* 集群服务操作指令表
Expand All @@ -41,6 +42,8 @@ public interface ClusterServiceCommandService extends IService<ClusterServiceCom

Result generateServiceCommand(Integer clusterId, CommandType command, List<String> ids);

Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map<Integer, List<String>> instanceIdMap);

Result generateServiceRoleCommand(Integer clusterId, CommandType command, Integer serviceIntanceId, List<String> ids);

void startExecuteCommand(Integer clusterId, String commandType, String commandIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
*/
public interface ClusterServiceRoleInstanceService extends IService<ClusterServiceRoleInstanceEntity> {

List<ClusterServiceRoleInstanceEntity> getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId);

List<ClusterServiceRoleInstanceEntity> getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId);

List<ClusterServiceRoleInstanceEntity> getServiceRoleInstanceListByServiceIdAndRoleState(Integer id, ServiceRoleState stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ public class ClusterServiceCommandServiceImpl extends ServiceImpl<ClusterService

@Override
@Transactional
public Result generateCommand(Integer clusterId, CommandType commandType,List<String> serviceNames) {
public Result generateCommand(Integer clusterId, CommandType commandType, List<String> serviceNames) {
ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId);

List<ClusterServiceCommandEntity> list = new ArrayList<>();
List<ClusterServiceCommandHostEntity> commandHostList = new ArrayList<>();
List<ClusterServiceCommandHostCommandEntity> hostCommandList = new ArrayList<>();
List<String> commandIds = new ArrayList<String>();

Map<String, List<String>> serviceRoleHostMap = (Map<String, List<String>>) CacheUtils.get(clusterInfo.getClusterCode() +Constants.UNDERLINE+ Constants.SERVICE_ROLE_HOST_MAPPING);
Map<String, List<String>> serviceRoleHostMap = (Map<String, List<String>>) CacheUtils.get(clusterInfo.getClusterCode() + Constants.UNDERLINE + Constants.SERVICE_ROLE_HOST_MAPPING);

for (String serviceName : serviceNames) {
for (String serviceName : serviceNames) {
//1、生成操作指令
ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(clusterId, serviceName);

ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId,commandType,serviceName);
ClusterServiceCommandEntity commandEntity = ProcessUtils.generateCommandEntity(clusterId, commandType, serviceName);
commandEntity.setServiceInstanceId(serviceInstance.getId());
list.add(commandEntity);
String commandId = commandEntity.getCommandId();
Expand All @@ -113,22 +113,22 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List<St
List<FrameServiceRoleEntity> serviceRoleList = (List<FrameServiceRoleEntity>) result.getData();
HashMap<String, ClusterServiceCommandHostEntity> map = new HashMap<>();
for (FrameServiceRoleEntity serviceRole : serviceRoleList) {
if(Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())){
if (Objects.nonNull(serviceRoleHostMap) && serviceRoleHostMap.containsKey(serviceRole.getServiceRoleName())) {
List<String> hosts = serviceRoleHostMap.get(serviceRole.getServiceRoleName());
for (String hostname : hosts) {
if(alreadyExistsServiceRole(serviceRole.getServiceRoleName(),hostname,clusterId)){
if (alreadyExistsServiceRole(serviceRole.getServiceRoleName(), hostname, clusterId)) {
continue;
}else{
} else {
ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(hostname)){
if (map.containsKey(hostname)) {
commandHost = map.get(hostname);
}else{
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, hostname);
commandHostList.add(commandHost);
map.put(hostname,commandHost);
map.put(hostname, commandHost);
}
//4、生成主机操作指令
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(),serviceRole.getServiceRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, serviceRole.getServiceRoleName(), serviceRole.getServiceRoleType(), commandHost);
hostCommandList.add(hostCommand);
}
}
Expand All @@ -138,12 +138,12 @@ public Result generateCommand(Integer clusterId, CommandType commandType,List<St
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);
return Result.success(String.join(",",commandIds));
return Result.success(String.join(",", commandIds));
}

private boolean alreadyExistsServiceRole(String serviceRoleName, String hostname, Integer clusterId) {
ClusterServiceRoleInstanceEntity serviceRole = roleInstanceService.getOneServiceRole(serviceRoleName, hostname, clusterId);
if(Objects.nonNull(serviceRole) ){
if (Objects.nonNull(serviceRole)) {
return true;
}
return false;
Expand All @@ -153,15 +153,13 @@ private boolean alreadyExistsServiceRole(String serviceRoleName, String hostname
@Override
public Result getServiceCommandlist(Integer clusterId, Integer page, Integer pageSize) {
Integer offset = (page - 1) * pageSize;
List<ClusterServiceCommandEntity> list = this.list(new QueryWrapper<ClusterServiceCommandEntity>()
.orderByDesc(Constants.CREATE_TIME)
.last("limit " + offset + "," + pageSize));
List<ClusterServiceCommandEntity> list = this.list(new QueryWrapper<ClusterServiceCommandEntity>().orderByDesc(Constants.CREATE_TIME).last("limit " + offset + "," + pageSize));
Integer total = this.count();
for (ClusterServiceCommandEntity commandEntity : list) {
commandEntity.setCommandStateCode(commandEntity.getCommandState().getValue());
Date createTime = commandEntity.getCreateTime();
Date endTime = commandEntity.getEndTime();
if(Objects.isNull(endTime)){
if (Objects.isNull(endTime)) {
endTime = new Date();
}
long between = DateUtil.between(createTime, endTime, DateUnit.MS);
Expand All @@ -175,6 +173,7 @@ public Result getServiceCommandlist(Integer clusterId, Integer page, Integer pag
* 1、生成指令
* 2、生成主机指令
* 3、生产主机上操作指令
*
* @param clusterId
* @param commandType
* @param serviceInstanceIds
Expand All @@ -190,7 +189,7 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType,
int id = Integer.parseInt(serviceInstanceId);
//查询服务对应的服务角色实例
List<ClusterServiceRoleInstanceEntity> roleInstanceList = roleInstanceService.getServiceRoleInstanceListByServiceId(id);
if(Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0){
if (Objects.isNull(roleInstanceList) || roleInstanceList.size() == 0) {
continue;
}
ClusterServiceInstanceEntity serviceInstance = serviceInstanceService.getById(id);
Expand All @@ -203,31 +202,40 @@ public Result generateServiceCommand(Integer clusterId, CommandType commandType,
HashMap<String, ClusterServiceCommandHostEntity> map = new HashMap<>();
for (ClusterServiceRoleInstanceEntity roleInstance : roleInstanceList) {
ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(roleInstance.getHostname())){
if (map.containsKey(roleInstance.getHostname())) {
commandHost = map.get(roleInstance.getHostname());
}else {
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname());
commandHostList.add(commandHost);
}
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost);
hostCommandList.add(hostCommand);
map.put(roleInstance.getHostname(),commandHost);
map.put(roleInstance.getHostname(), commandHost);
}
}
if(list.size() > 0){
if (list.size() > 0) {
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);

//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender());
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender());
}
return Result.success(String.join(",",commandIds));
return Result.success(String.join(",", commandIds));
}

@Override
public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId,List<String> serviceRoleInstanceIds) {
public Result generateServiceRoleCommands(Integer clusterId, CommandType commandType, Map<Integer, List<String>> instanceIdMap) {
Result result = null;
for (Map.Entry<Integer, List<String>> entry : instanceIdMap.entrySet()) {
result = generateServiceRoleCommand(clusterId, commandType, entry.getKey(), entry.getValue());
}
return result;
}

@Override
public Result generateServiceRoleCommand(Integer clusterId, CommandType commandType, Integer serviceInstanceId, List<String> serviceRoleInstanceIds) {
List<ClusterServiceCommandEntity> list = new ArrayList<>();
List<ClusterServiceCommandHostEntity> commandHostList = new ArrayList<>();
List<ClusterServiceCommandHostCommandEntity> hostCommandList = new ArrayList<>();
Expand All @@ -246,33 +254,33 @@ public Result generateServiceRoleCommand(Integer clusterId, CommandType commandT
ClusterServiceRoleInstanceEntity roleInstance = roleInstanceService.getById(id);

ClusterServiceCommandHostEntity commandHost;
if(map.containsKey(roleInstance.getHostname())){
if (map.containsKey(roleInstance.getHostname())) {
commandHost = map.get(roleInstance.getHostname());
}else {
} else {
commandHost = ProcessUtils.generateCommandHostEntity(commandId, roleInstance.getHostname());
commandHostList.add(commandHost);
}
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(),roleInstance.getRoleType(), commandHost);
ClusterServiceCommandHostCommandEntity hostCommand = ProcessUtils.generateCommandHostCommandEntity(commandType, commandId, roleInstance.getServiceRoleName(), roleInstance.getRoleType(), commandHost);
hostCommandList.add(hostCommand);
map.put(roleInstance.getHostname(),commandHost);
map.put(roleInstance.getHostname(), commandHost);
}
commandService.saveBatch(list);
commandHostService.saveBatch(commandHostList);
hostCommandService.saveBatch(hostCommandList);

//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds,clusterId, commandType),ActorRef.noSender());
return Result.success(String.join(",",commandIds));
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(commandIds, clusterId, commandType), ActorRef.noSender());
return Result.success(String.join(",", commandIds));
}

@Override
public void startExecuteCommand(Integer clusterId, String commandType, String commandIds) {
List<String> list = Arrays.asList(commandIds.split(","));
CommandType command = EnumUtil.fromString(CommandType.class, commandType);
//通知commandActor执行命令
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class,ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(list,clusterId, command),ActorRef.noSender());
ActorRef dagBuildActor = ActorUtils.getLocalActor(DAGBuildActor.class, ActorUtils.getActorRefName(DAGBuildActor.class));
dagBuildActor.tell(new StartExecuteCommandCommand(list, clusterId, command), ActorRef.noSender());
}

@Override
Expand All @@ -283,11 +291,6 @@ public void cancelCommand(String commandId) {

@Override
public ClusterServiceCommandEntity getLastRestartCommand(Integer serviceInstanceId) {
return this.getOne(new QueryWrapper<ClusterServiceCommandEntity>()
.eq(Constants.SERVICE_INSTANCE_ID,serviceInstanceId)
.eq(Constants.COMMAND_TYPE,CommandType.RESTART_SERVICE.getValue())
.or()
.eq(Constants.COMMAND_TYPE,CommandType.INSTALL_SERVICE.getValue())
.orderByDesc(Constants.CREATE_TIME).last("limit 1"));
return this.getOne(new QueryWrapper<ClusterServiceCommandEntity>().eq(Constants.SERVICE_INSTANCE_ID, serviceInstanceId).eq(Constants.COMMAND_TYPE, CommandType.RESTART_SERVICE.getValue()).or().eq(Constants.COMMAND_TYPE, CommandType.INSTALL_SERVICE.getValue()).orderByDesc(Constants.CREATE_TIME).last("limit 1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public class ClusterServiceRoleInstanceServiceImpl extends ServiceImpl<ClusterSe
@Autowired
private ClusterAlertHistoryService alertHistoryService;

@Override
public List<ClusterServiceRoleInstanceEntity> getStoppedServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) {
return this.lambdaQuery()
.eq(ClusterServiceRoleInstanceEntity::getClusterId, clusterId)
.eq(ClusterServiceRoleInstanceEntity::getHostname, hostname)
.eq(ClusterServiceRoleInstanceEntity::getServiceRoleState, ServiceRoleState.STOP)
.list();
}

@Override
public List<ClusterServiceRoleInstanceEntity> getServiceRoleListByHostnameAndClusterId(String hostname, Integer clusterId) {
return this.list(new QueryWrapper<ClusterServiceRoleInstanceEntity>()
Expand Down

0 comments on commit ae038a3

Please sign in to comment.