Skip to content

Commit

Permalink
[INLONG-10277][Manager] Fix the problem of migration failed when migr…
Browse files Browse the repository at this point in the history
…ating multiple groups to the same tenant (#10285)
  • Loading branch information
fuweng11 authored May 28, 2024
1 parent e0a564b commit af0cc82
Showing 1 changed file with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.inlong.manager.dao.entity.InlongTenantEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.entity.TenantClusterTagEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
Expand Down Expand Up @@ -236,14 +237,16 @@ public Boolean doMigrate(String groupId, String from, String to) {
// get related streams, consumes and tag;
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
boolean streamMigrateResult = streamList.stream().allMatch(stream -> migrateStream(stream, from, to));
log.info("migrate stream from source tenant={} to target tenant={} for groupId={}, result={}", from, to,
groupId, streamMigrateResult);
List<InlongConsumeEntity> consumeList = consumeEntityMapper.selectByGroupId(groupId);
boolean consumeMigrateResult = this.migrateConsume(groupId, from, to, consumeList.size());
boolean tagCopyResult = this.copyTenantTag(group.getInlongClusterTag(), from, to);

return streamMigrateResult
&& consumeMigrateResult
&& tagCopyResult
&& this.migrateGroup(groupId, from, to);
boolean groupMigrateResult = this.migrateGroup(groupId, from, to);
boolean migrateResult = streamMigrateResult && consumeMigrateResult && tagCopyResult && groupMigrateResult;
log.info("migrate from source tenant={} to target tenant={} for groupId={}, result={}", from, to, groupId,
migrateResult);
return migrateResult;
}

public Boolean migrateStream(InlongStreamEntity stream, String from, String to) {
Expand Down Expand Up @@ -305,17 +308,29 @@ public String copyDataNode(String name, String type, String from, String to) {
}

public Boolean migrateConsume(String groupId, String from, String to, int size) {
return consumeEntityMapper.migrate(groupId, from, to) == size;
Boolean result = consumeEntityMapper.migrate(groupId, from, to) == size;
log.info("migrate consume from source tenant={} to target tenant={} for groupId={}, result={}", from, to,
groupId, result);
return result;
}

public Boolean migrateGroup(String groupId, String from, String to) {
return groupMapper.migrate(groupId, from, to) == InlongConstants.AFFECTED_ONE_ROW;
Boolean result = groupMapper.migrate(groupId, from, to) == InlongConstants.AFFECTED_ONE_ROW;
log.info("migrate group from source tenant={} to target tenant={} for groupId={}, result={}", from, to, groupId,
result);
return result;
}

public Boolean copyTenantTag(String clusterTag, String from, String to) {
try {
TenantClusterTagEntity existEntity = tenantClusterTagMapper.selectByUniqueKey(clusterTag, to);
if (existEntity != null) {
log.debug("tag name={} in tenant={} already exist", clusterTag, to);
return true;
}
// use displayName as the new name
return tenantClusterTagMapper.copy(clusterTag, from, to) == InlongConstants.AFFECTED_ONE_ROW;
tenantClusterTagMapper.copy(clusterTag, from, to);
return true;
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SQLIntegrityConstraintViolationException) {
Expand Down

0 comments on commit af0cc82

Please sign in to comment.