Skip to content

Commit

Permalink
[INLONG-9314][Manager] Support cluster switch for InlongGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 29, 2023
1 parent 7583980 commit 11c780d
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@ public class ClusterSwitch {
* MQ resource for backup, represents the namespace of Pulsar, the topic of TubeMQ, etc.
*/
public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource";

/**
* Cluster swtich start time
*/
public static final String CLUSTER_SWITCH_TIME = "cluster_switch_time";

public static final int FINISH_SWITCH_INTERVAL_MIN = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public enum ErrorCodeEnum {
CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
CLUSTER_TAG_NOT_FOUND(1104, "Cluster tag information does not exist"),

TENANT_CLUSTER_TAG_NOT_FOUND(1105, "Tenant Cluster tag does not exist"),

DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
DATA_NODE_ID_CHANGED(1152, "Data node information's id not equals"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface TenantClusterTagEntityMapper {

TenantClusterTagEntity selectByPrimaryKey(Integer id);

TenantClusterTagEntity selectByUniqueKey(String clusterTag, String tenant);

List<TenantClusterTagEntity> selectByTag(String clusterTag);

List<TenantClusterTagEntity> selectByCondition(TenantClusterTagPageRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
from tenant_cluster_tag
where id = #{id,jdbcType=INTEGER}
</select>
<select id="selectByUniqueKey" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from tenant_cluster_tag
where tenant = #{tenant,jdbcType=VARCHAR}
and cluster_tag = #{clusterTag,jdbcType=VARCHAR}
and is_deleted = 0
</select>
<select id="selectByTag" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,8 @@ void updateAfterApprove(
*/
Map<String, Object> detail(String groupId);

Boolean startTagSwitch(String groupId, String clusterTag);

Boolean finishTagSwitch(String groupId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
Expand All @@ -48,20 +53,27 @@
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;

import com.fasterxml.jackson.core.type.TypeReference;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -76,6 +88,7 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -85,8 +98,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG;
import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
import static org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE;

/**
* Inlong group service layer implementation
Expand All @@ -107,11 +120,19 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Autowired
private InlongStreamService streamService;
@Autowired
private StreamSinkService streamSinkService;
@Autowired
private StreamSourceEntityMapper streamSourceMapper;
@Autowired
private TenantClusterTagEntityMapper tenantClusterTagMapper;
@Autowired
private InlongStreamExtEntityMapper streamExtMapper;
@Autowired
private InlongClusterService clusterService;
@Autowired
private WorkflowService workflowService;
@Autowired
private InlongClusterEntityMapper clusterEntityMapper;

@Autowired
private InlongGroupOperatorFactory groupOperatorFactory;
Expand Down Expand Up @@ -672,4 +693,134 @@ private void chkUnmodifiableParams(InlongGroupEntity entity, InlongGroupRequest
String.format("record has expired with record version=%d, request version=%d",
entity.getVersion(), request.getVersion()));
}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
public Boolean startTagSwitch(String groupId, String clusterTag) {
LOGGER.info("start to switch cluster tag for group={}, target tag={}", groupId, clusterTag);

InlongGroupInfo groupInfo = this.get(groupId);

// check if the group is under switching
List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
Set<String> keys = groupExt.stream()
.map(InlongGroupExtInfo::getKeyName)
.collect(Collectors.toSet());

if (keys.contains(BACKUP_CLUSTER_TAG) || keys.contains(BACKUP_MQ_RESOURCE)) {
String errMsg = String.format("switch failed, current group is under switching, group=[%s]", groupId);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}

// check if the cluster tag is under current tenant
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
LOGGER.error("inlong group not found by groupId={}", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}

TenantClusterTagEntity tenantClusterTag =
tenantClusterTagMapper.selectByUniqueKey(clusterTag, groupEntity.getTenant());
if (tenantClusterTag == null) {
LOGGER.error("tenant cluster not found for tenant={}, cluster={}", groupEntity.getTenant(), clusterTag);
throw new BusinessException(ErrorCodeEnum.TENANT_CLUSTER_TAG_NOT_FOUND);
}

// check if all sink related sort cluster has the target cluster tag
List<StreamSink> sinks = streamSinkService.listSink(groupEntity.getInlongGroupId(), null);
for (StreamSink sink : sinks) {
String clusterName = sink.getInlongClusterName();
InlongClusterEntity clusterEntity =
clusterEntityMapper.selectByNameAndType(clusterName, null);
if (clusterEntity == null) {
String errMsg = String.format("find no cluster with name=[%s]", clusterName);
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND, errMsg);
}

Set<String> tags = ImmutableSet.copyOf(clusterEntity.getClusterTags().split(InlongConstants.COMMA));
if (!tags.isEmpty() && !tags.contains(clusterTag)) {
String errMsg = String.format("find no cluster tag=[%s] in cluster name=[%s]", clusterTag, clusterName);
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CLUSTER_TAG_NOT_FOUND, errMsg);
}

}

// config cluster tag and backup_cluster_tag
UserInfo userInfo = LoginUserUtils.getLoginUser();
InlongGroupRequest request = groupInfo.genRequest();
String oldClusterTag = request.getInlongClusterTag();
request.setInlongClusterTag(clusterTag);
request.getExtList().add(new InlongGroupExtInfo(null, groupId, BACKUP_CLUSTER_TAG, oldClusterTag));
request.getExtList().add(new InlongGroupExtInfo(null, groupId, BACKUP_MQ_RESOURCE, request.getMqResource()));
request.getExtList().add(new InlongGroupExtInfo(null, groupId, CLUSTER_SWITCH_TIME,
LocalDateTime.now().toString()));
this.update(request, userInfo.getName());

// trigger group workflow to rebuild configs
this.triggerWorkFlow(groupInfo, userInfo);
LOGGER.info("success to switch cluster tag for group={}, target tag={}", groupId, clusterTag);
return true;
}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
public Boolean finishTagSwitch(String groupId) {
LOGGER.info("start to finish switch cluster tag for group={}", groupId);

InlongGroupInfo groupInfo = this.get(groupId);
UserInfo userInfo = LoginUserUtils.getLoginUser();

// check if the group is under switching
List<InlongGroupExtInfo> groupExt = groupInfo.getExtList();
Map<String, InlongGroupExtInfo> extInfoMap = groupExt.stream()
.collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, v -> v));

if (!extInfoMap.containsKey(BACKUP_CLUSTER_TAG) || !extInfoMap.containsKey(BACKUP_MQ_RESOURCE)) {
String errMsg = String.format("finish switch failed, current group is not under switching, group=[%s]",
groupId);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}

InlongGroupExtInfo switchTime = extInfoMap.get(CLUSTER_SWITCH_TIME);
LocalDateTime switchStartTime =
switchTime == null ? LocalDateTime.MIN : LocalDateTime.parse(switchTime.getKeyValue());

// check the switch time
LocalDateTime allowSwitchTime = switchStartTime.plusMinutes(FINISH_SWITCH_INTERVAL_MIN);
if (LocalDateTime.now().isBefore(allowSwitchTime)) {
String errMsg = String.format("finish switch failed, please retry until={}", allowSwitchTime);
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}

// remove backup ext info
removeExt(extInfoMap.get(BACKUP_CLUSTER_TAG));
removeExt(extInfoMap.get(BACKUP_MQ_RESOURCE));
removeExt(extInfoMap.get(CLUSTER_SWITCH_TIME));

// trigger group workflow to rebuild configs
this.triggerWorkFlow(groupInfo, userInfo);
return true;
}

private void triggerWorkFlow(InlongGroupInfo groupInfo, UserInfo userInfo) {
GroupResourceProcessForm processForm = new GroupResourceProcessForm();
processForm.setGroupInfo(groupInfo);
List<InlongStreamInfo> streamList = streamService.list(groupInfo.getInlongGroupId());
processForm.setStreamInfos(streamList);
EXECUTOR_SERVICE.execute(
() -> workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, processForm));
}

private void removeExt(InlongGroupExtInfo extInfo) {
if (extInfo == null || extInfo.getId() == null) {
return;
}
groupExtMapper.deleteByPrimaryKey(extInfo.getId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -206,4 +207,21 @@ public Response<Map<String, Object>> detail(@PathVariable String groupId) {
return Response.success(groupService.detail(groupId));
}

@RequestMapping(value = "/group/switch/start/{groupId}/{clusterTag}", method = RequestMethod.GET)
@ApiOperation(value = "start tag switch")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "clusterTag", value = "cluster tag", dataTypeClass = String.class, required = true)
})
public Response<Boolean> startTagSwitch(@PathVariable String groupId, @PathVariable String clusterTag) {
return Response.success(groupService.startTagSwitch(groupId, clusterTag));
}

@RequestMapping(value = "/group/switch/finish/{groupId}", method = RequestMethod.GET)
@ApiOperation(value = "finish tag switch")
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
public Response<Boolean> finishTagSwitch(@PathVariable String groupId) {
return Response.success(groupService.finishTagSwitch(groupId));
}

}

0 comments on commit 11c780d

Please sign in to comment.