Skip to content

Commit

Permalink
[improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk committed Nov 4, 2024
1 parent 1349b86 commit ac6a485
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.mapping.JdbcIdentifierMapping;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
Expand All @@ -57,6 +59,7 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
Expand All @@ -71,6 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
// Must add "transient" for Gson to ignore this field,
// or Gson will throw exception with HikariCP
private transient JdbcClient jdbcClient;
private transient IdentifierMapping identifierMapping;

public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment)
Expand Down Expand Up @@ -119,20 +123,27 @@ public void onRefresh(boolean invalidCache) {
super.onRefresh(invalidCache);
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
identifierMapping = null;
}

@Override
public void onRefreshCache(boolean invalidCache) {
onRefresh(invalidCache);
super.onRefreshCache(invalidCache);
if (identifierMapping != null) {
((JdbcIdentifierMapping) identifierMapping).clear();
}
}

@Override
public void onClose() {
super.onClose();
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
identifierMapping = null;
}

protected Map<String, String> processCompatibleProperties(Map<String, String> props)
Expand Down Expand Up @@ -232,8 +243,6 @@ protected void initLocalObjectsImpl() {
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseMetaNames(getLowerCaseMetaNames())
.setMetaNamesMapping(getMetaNamesMapping())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())
Expand All @@ -243,22 +252,88 @@ protected void initLocalObjectsImpl() {
.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());

jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
identifierMapping = new JdbcIdentifierMapping(this);
}

@Override
public List<String> listDatabaseNames() {
return jdbcClient.getDatabaseNameList().stream()
.map(identifierMapping::fromRemoteDatabaseName)
.collect(Collectors.toList());
}

public void loadRemoteDatabase() {
makeSureInitialized();
jdbcClient.getDatabaseNameList().stream()
.map(identifierMapping::fromRemoteDatabaseName)
.collect(Collectors.toList());
}

protected List<String> listDatabaseNames() {
return jdbcClient.getDatabaseNameList();
protected String getRemoteDatabaseName(String dbName) {
return identifierMapping.toRemoteDatabaseName(dbName);
}

@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
return jdbcClient.getTablesNameList(dbName);
String remoteDbName = getRemoteDatabaseName(dbName);
return jdbcClient.getTablesNameList(remoteDbName).stream()
.map(tblName -> identifierMapping.fromRemoteTableName(remoteDbName, tblName))
.collect(Collectors.toList());
}

public void loadRemoteTable(String remoteDbName) {
makeSureInitialized();
jdbcClient.getTablesNameList(remoteDbName).stream()
.map(tblName -> identifierMapping.fromRemoteTableName(remoteDbName, tblName))
.collect(Collectors.toList());
}

protected String getRemoteTableName(String dbName, String tblName) {
return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName);
}

@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
return jdbcClient.isTableExist(dbName, tblName);
String remoteDbName = getRemoteDatabaseName(dbName);
String remoteTblName = getRemoteTableName(dbName, tblName);
return jdbcClient.isTableExist(remoteDbName, remoteTblName);
}

public List<Column> listColumns(String dbName, String tblName) {
makeSureInitialized();
String remoteDbName = getRemoteDatabaseName(dbName);
String remoteTblName = getRemoteTableName(dbName, tblName);

List<Column> remoteColumns = jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName);

List<String> remoteColumnNames = remoteColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());

List<String> localColumnNames = remoteColumnNames.stream()
.map(remoteColumnName -> identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName,
remoteColumnName))
.collect(Collectors.toList());

for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}

return remoteColumns;
}

public void loadRemoteColumns(String remoteDbName, String remoteTblName) {
makeSureInitialized();
jdbcClient.getColumnsFromJdbc(remoteDbName, remoteTblName).stream()
.map(column -> identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName, column.getName()))
.collect(Collectors.toList());
}

protected String getRemoteColumnNames(String dbName, String tblName, String localColumnName) {
return identifierMapping.toRemoteColumnName(getRemoteDatabaseName(dbName), getRemoteTableName(dbName, tblName),
localColumnName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Maps;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -102,21 +103,33 @@ public TTableDescriptor toThrift() {

@Override
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient()
.getColumnsFromJdbc(dbName, name)));
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name)));
}

private JdbcTable toJdbcTable() {
List<Column> schema = getFullSchema();
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
String fullTableName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName);

// Set remote properties
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name));
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name));
Map<String, String> remoteColumnNames = Maps.newHashMap();
for (Column column : schema) {
String remoteColumnName = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name, column.getName());
remoteColumnNames.put(column.getName(), remoteColumnName);
}
if (!remoteColumnNames.isEmpty()) {
jdbcTable.setRemoteColumnNames(remoteColumnNames);
} else {
remoteColumnNames = Maps.newHashMap();
for (Column column : getFullSchema()) {
remoteColumnNames.put(column.getName(), column.getName());
}
jdbcTable.setRemoteColumnNames(remoteColumnNames);
}

return jdbcTable;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -63,11 +62,8 @@ public abstract class JdbcClient {
protected ClassLoader classLoader = null;
protected HikariDataSource dataSource = null;
protected boolean isOnlySpecifiedDatabase;
protected boolean isLowerCaseMetaNames;
protected String metaNamesMapping;
protected Map<String, Boolean> includeDatabaseMap;
protected Map<String, Boolean> excludeDatabaseMap;
protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;

public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
Expand Down Expand Up @@ -104,8 +100,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) {
this.catalogName = jdbcClientConfig.getCatalog();
this.jdbcUser = jdbcClientConfig.getUser();
this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
this.includeDatabaseMap =
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
this.excludeDatabaseMap =
Expand All @@ -114,7 +108,6 @@ protected JdbcClient(JdbcClientConfig jdbcClientConfig) {
this.dbType = parseDbType(jdbcUrl);
initializeClassLoader(jdbcClientConfig);
initializeDataSource(jdbcClientConfig);
this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
}

// Initialize DataSource
Expand Down Expand Up @@ -170,6 +163,7 @@ public static String parseDbType(String jdbcUrl) {

public void closeClient() {
dataSource.close();
dataSource = null;
}

public Connection getConnection() throws JdbcClientException {
Expand Down Expand Up @@ -308,10 +302,9 @@ public List<String> getDatabaseNameList() {
/**
* get all tables of one database
*/
public List<String> getTablesNameList(String localDbName) {
public List<String> getTablesNameList(String remoteDbName) {
List<String> remoteTablesNames = Lists.newArrayList();
String[] tableTypes = getTableTypes();
String remoteDbName = getRemoteDatabaseName(localDbName);
processTable(remoteDbName, null, tableTypes, (rs) -> {
try {
while (rs.next()) {
Expand All @@ -321,14 +314,12 @@ public List<String> getTablesNameList(String localDbName) {
throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName);
}
});
return filterTableNames(remoteDbName, remoteTablesNames);
return remoteTablesNames;
}

public boolean isTableExist(String localDbName, String localTableName) {
public boolean isTableExist(String remoteDbName, String remoteTableName) {
final boolean[] isExist = {false};
String[] tableTypes = getTableTypes();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
try {
if (rs.next()) {
Expand All @@ -345,12 +336,10 @@ public boolean isTableExist(String localDbName, String localTableName) {
/**
* get all columns of one table
*/
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
Connection conn = null;
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
conn = getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
Expand All @@ -377,21 +366,7 @@ public List<Column> getColumnsFromJdbc(String localDbName, String localTableName
field.isAllowNull(), field.getRemarks(),
true, -1));
}
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema);
}

public String getRemoteDatabaseName(String localDbname) {
return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
}

public String getRemoteTableName(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName);
}

public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName);
return dorisTableSchema;
}

// protected methods, for subclass to override
Expand Down Expand Up @@ -450,7 +425,7 @@ protected List<String> filterDatabaseNames(List<String> remoteDbNames) {
}
filteredDatabaseNames.add(databaseName);
}
return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
return filteredDatabaseNames;
}

protected Set<String> getFilterInternalDatabases() {
Expand All @@ -461,14 +436,6 @@ protected Set<String> getFilterInternalDatabases() {
.build();
}

protected List<String> filterTableNames(String remoteDbName, List<String> remoteTableNames) {
return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames);
}

protected List<Column> filterColumnName(String remoteDbName, String remoteTableName, List<Column> remoteColumns) {
return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns);
}

protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema);

protected Type createDecimalOrStringType(int precision, int scale) {
Expand Down
Loading

0 comments on commit ac6a485

Please sign in to comment.