Skip to content

Commit

Permalink
Improve DDL genereate method at migration prepare (#20245)
Browse files Browse the repository at this point in the history
* Revise #20207

* Improve parameter name

* Fix codestyle
  • Loading branch information
azexcy authored Aug 18, 2022
1 parent 0cd2ab9 commit 3b15d53
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.shardingsphere.data.pipeline.core.metadata.generator;

import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGeneratorFactory;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
Expand All @@ -33,18 +33,20 @@
import org.apache.shardingsphere.infra.binder.type.IndexAvailable;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.util.IndexMetaDataUtil;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundException;
import org.apache.shardingsphere.sql.parser.sql.common.segment.SQLSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.constraint.ConstraintSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -69,25 +71,22 @@ public final class PipelineDDLGenerator {
/**
* Generate logic ddl sql.
*
* @param database database
* @param dataSourceName data source name
* @param sourceDataSource source data source
* @param databaseType database type
* @param schemaName schema name
* @param logicTableName table name
* @param actualTableName actual table name
* @param parserEngine parser engine
* @return ddl SQL
*/
@SneakyThrows
public String generateLogicDDLSQL(final ShardingSphereDatabase database, final String dataSourceName, final String schemaName,
final String logicTableName, final String actualTableName,
public String generateLogicDDLSQL(final DataSource sourceDataSource, final DatabaseType databaseType, final String schemaName, final String logicTableName, final String actualTableName,
final ShardingSphereSQLParserEngine parserEngine) {
DatabaseType databaseType = database.getProtocolType();
log.info("generateLogicDDLSQL, databaseType={}, databaseName={}, schemaName={}, tableName={}, dataSourceNames={}",
databaseType.getType(), database.getName(), schemaName, logicTableName, database.getResource().getDataSources().keySet());
Collection<String> multiSQL = generateActualDDLSQL(databaseType, schemaName, actualTableName, database.getResource().getDataSources().get(dataSourceName));
log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, tableName={}", databaseType.getType(), schemaName, logicTableName);
Collection<String> multiSQL = generateActualDDLSQL(databaseType, schemaName, actualTableName, sourceDataSource);
StringBuilder result = new StringBuilder();
for (String each : multiSQL) {
Optional<String> logicSQL = decorate(databaseType, database.getName(), schemaName, database, each, parserEngine);
Optional<String> logicSQL = decorate(databaseType, schemaName, sourceDataSource, each, logicTableName, parserEngine);
logicSQL.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
return result.toString();
Expand Down Expand Up @@ -118,12 +117,16 @@ public String replaceTableNameWithPrefix(final String sql, final String prefix,
return sql;
}

private Optional<String> decorate(final DatabaseType databaseType, final String databaseName, final String schemaName, final ShardingSphereDatabase database, final String sql,
final ShardingSphereSQLParserEngine parserEngine) {
private Optional<String> decorate(final DatabaseType databaseType, final String schemaName, final DataSource dataSource, final String sql, final String logicTableName,
final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
if (sql.trim().isEmpty()) {
return Optional.empty();
}
String result = decorateActualSQL(sql.trim(), database, databaseName, parserEngine);
String databaseName;
try (Connection connection = dataSource.getConnection()) {
databaseName = connection.getCatalog();
}
String result = decorateActualSQL(sql.trim(), logicTableName, databaseName, parserEngine);
// TODO remove it after set search_path is supported.
if ("openGauss".equals(databaseType.getType())) {
return decorateOpenGauss(databaseName, schemaName, result, parserEngine);
Expand All @@ -132,39 +135,38 @@ private Optional<String> decorate(final DatabaseType databaseType, final String
}

private Collection<String> generateActualDDLSQL(final DatabaseType databaseType, final String schemaName, final String actualTable, final DataSource dataSource) throws SQLException {
return CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new ShardingSphereException("Failed to get dialect ddl sql generator"))
return CreateTableSQLGeneratorFactory.findInstance(databaseType).orElseThrow(() -> new ServiceProviderNotFoundException(CreateTableSQLGenerator.class, databaseType.getType()))
.generate(actualTable, schemaName, dataSource);
}

private String decorateActualSQL(final String sql, final ShardingSphereDatabase database, final String databaseName, final ShardingSphereSQLParserEngine parserEngine) {
private String decorateActualSQL(final String sql, final String logicTableName, final String databaseName, final ShardingSphereSQLParserEngine parserEngine) {
LogicSQL logicSQL = getLogicSQL(sql, databaseName, parserEngine);
SQLStatementContext<?> sqlStatementContext = logicSQL.getSqlStatementContext();
Map<SQLSegment, String> replaceMap = new TreeMap<>(Comparator.comparing(SQLSegment::getStartIndex));
if (sqlStatementContext instanceof CreateTableStatementContext) {
appendFromIndexAndConstraint(replaceMap, database, sqlStatementContext);
appendFromTable(replaceMap, database, (TableAvailable) sqlStatementContext);
appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
}
if (sqlStatementContext instanceof CommentStatementContext) {
appendFromTable(replaceMap, database, (TableAvailable) sqlStatementContext);
appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
}
if (sqlStatementContext instanceof CreateIndexStatementContext) {
appendFromTable(replaceMap, database, (TableAvailable) sqlStatementContext);
appendFromIndexAndConstraint(replaceMap, database, sqlStatementContext);
appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
}
if (sqlStatementContext instanceof AlterTableStatementContext) {
appendFromIndexAndConstraint(replaceMap, database, sqlStatementContext);
appendFromTable(replaceMap, database, (TableAvailable) sqlStatementContext);
appendFromIndexAndConstraint(replaceMap, logicTableName, sqlStatementContext);
appendFromTable(replaceMap, logicTableName, (TableAvailable) sqlStatementContext);
}
return doDecorateActualTable(replaceMap, sql);
}

private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final ShardingSphereDatabase database, final SQLStatementContext<?> sqlStatementContext) {
private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceMap, final String logicTableName, final SQLStatementContext<?> sqlStatementContext) {
if (!(sqlStatementContext instanceof TableAvailable) || ((TableAvailable) sqlStatementContext).getTablesContext().getTables().isEmpty()) {
return;
}
TableNameSegment tableNameSegment = ((TableAvailable) sqlStatementContext).getTablesContext().getTables().iterator().next().getTableName();
String logicTable = findLogicTable(tableNameSegment, database);
if (!tableNameSegment.getIdentifier().getValue().equals(logicTable)) {
if (!tableNameSegment.getIdentifier().getValue().equals(logicTableName)) {
if (sqlStatementContext instanceof IndexAvailable) {
for (IndexSegment each : ((IndexAvailable) sqlStatementContext).getIndexes()) {
String logicIndexName = IndexMetaDataUtil.getLogicIndexName(each.getIndexName().getIdentifier().getValue(), tableNameSegment.getIdentifier().getValue());
Expand All @@ -180,11 +182,10 @@ private void appendFromIndexAndConstraint(final Map<SQLSegment, String> replaceM
}
}

private void appendFromTable(final Map<SQLSegment, String> replaceMap, final ShardingSphereDatabase database, final TableAvailable sqlStatementContext) {
private void appendFromTable(final Map<SQLSegment, String> replaceMap, final String logicTableName, final TableAvailable sqlStatementContext) {
for (SimpleTableSegment each : sqlStatementContext.getAllTables()) {
String logicTable = findLogicTable(each.getTableName(), database);
if (!logicTable.equals(each.getTableName().getIdentifier().getValue())) {
replaceMap.put(each.getTableName(), logicTable);
if (!logicTableName.equals(each.getTableName().getIdentifier().getValue())) {
replaceMap.put(each.getTableName(), logicTableName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@

package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;

import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datanode.DataNodes;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;

/**
* Abstract data source preparer.
Expand All @@ -57,7 +60,7 @@ public void prepareTargetSchemas(final PrepareTargetSchemasParameter parameter)
String defaultSchema = DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), parameter.getDatabaseName());
log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", schemaNames, defaultSchema);
PipelineSQLBuilder pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
try (Connection targetConnection = getTargetCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
try (Connection targetConnection = getCachedDataSource(parameter.getDataSourceConfig(), parameter.getDataSourceManager()).getConnection()) {
for (String each : schemaNames) {
if (each.equalsIgnoreCase(defaultSchema)) {
continue;
Expand Down Expand Up @@ -91,7 +94,7 @@ protected final PipelineDataSourceWrapper getSourceCachedDataSource(final Migrat
return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
}

protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
protected final PipelineDataSourceWrapper getCachedDataSource(final PipelineDataSourceConfiguration dataSourceConfig, final PipelineDataSourceManager dataSourceManager) {
return dataSourceManager.getDataSource(dataSourceConfig);
}

Expand All @@ -116,11 +119,17 @@ protected final String addIfNotExistsForCreateTableSQL(final String createTableS
return PATTERN_CREATE_TABLE.matcher(createTableSQL).replaceFirst("CREATE TABLE IF NOT EXISTS ");
}

protected String getActualTable(final ShardingSphereDatabase database, final String tableName) {
DataNodes dataNodes = new DataNodes(database.getRuleMetaData().getRules());
Optional<DataNode> filteredDataNode = dataNodes.getDataNodes(tableName).stream()
.filter(each -> database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".") ? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
.findFirst();
return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
protected List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) {
PipelineDDLGenerator generator = new PipelineDDLGenerator();
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
String dataSourceName = each.getDataNodes().get(0).getDataSourceName();
DataSource dataSource = parameter.getSourceDataSourceMap().get(dataSourceName);
DatabaseType databaseType = DatabaseTypeEngine.getDatabaseType(Collections.singletonList(dataSource));
String actualTableName = parameter.getTableNameMap().get(each.getLogicTableName());
result.add(generator.generateLogicDDLSQL(dataSource, databaseType, schemaName, each.getLogicTableName(), actualTableName, parameter.getSqlParserEngine()));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;

import javax.sql.DataSource;
import java.util.Map;

/**
* Prepare target tables parameter.
Expand All @@ -34,19 +38,29 @@ public final class PrepareTargetTablesParameter {

private final JobDataNodeLine tablesFirstDataNodes;

private final PipelineDataSourceConfiguration dataSourceConfig;
private final PipelineDataSourceConfiguration targetDataSourceConfig;

private final Map<String, DataSource> sourceDataSourceMap;

private final PipelineDataSourceManager dataSourceManager;

private final Map<String, String> tableNameMap;

private final TableNameSchemaNameMapping tableNameSchemaNameMapping;

public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration dataSourceConfig,
@NonNull final PipelineDataSourceManager dataSourceManager,
@NonNull final String tablesFirstDataNodes, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
private final ShardingSphereSQLParserEngine sqlParserEngine;

public PrepareTargetTablesParameter(@NonNull final String databaseName, @NonNull final PipelineDataSourceConfiguration targetDataSourceConfig,
@NonNull final Map<String, DataSource> sourceDataSourceMap, @NonNull final PipelineDataSourceManager dataSourceManager,
@NonNull final JobDataNodeLine tablesFirstDataNodes, final Map<String, String> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping,
@NonNull final ShardingSphereSQLParserEngine sqlParserEngine) {
this.databaseName = databaseName;
this.dataSourceConfig = dataSourceConfig;
this.tablesFirstDataNodes = JobDataNodeLine.unmarshal(tablesFirstDataNodes);
this.targetDataSourceConfig = targetDataSourceConfig;
this.sourceDataSourceMap = sourceDataSourceMap;
this.tablesFirstDataNodes = tablesFirstDataNodes;
this.dataSourceManager = dataSourceManager;
this.tableNameMap = tableNameMap;
this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
this.sqlParserEngine = sqlParserEngine;
}
}
Loading

0 comments on commit 3b15d53

Please sign in to comment.