Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability #41510

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ protected List<String> listDatabaseNames() {
}
}

public String fromRemoteDatabaseName(String remoteDatabaseName) {
return remoteDatabaseName;
}

// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
Expand Down Expand Up @@ -219,6 +223,10 @@ public void checkWhenCreating() throws DdlException {
*/
public abstract boolean tableExist(SessionContext ctx, String dbName, String tblName);

public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) {
return remoteTableName;
}

/**
* init some local objects such as:
* hms client, read properties from hive-site.xml, es client
Expand Down Expand Up @@ -702,13 +710,14 @@ protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String dbName
if (dbName.equals(MysqlDb.DATABASE_NAME)) {
return new ExternalMysqlDatabase(this, dbId);
}
String localDbName = fromRemoteDatabaseName(dbName);
switch (logType) {
case HMS:
return new HMSExternalDatabase(this, dbId, dbName);
case ES:
return new EsExternalDatabase(this, dbId, dbName);
case JDBC:
return new JdbcExternalDatabase(this, dbId, dbName);
return new JdbcExternalDatabase(this, dbId, localDbName, dbName);
case ICEBERG:
return new IcebergExternalDatabase(this, dbId, dbName);
case MAX_COMPUTE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class ExternalDatabase<T extends ExternalTable>
protected long id;
@SerializedName(value = "name")
protected String name;
@SerializedName(value = "remoteName")
protected String remoteName;
@SerializedName(value = "dbProperties")
protected DatabaseProperty dbProperties = new DatabaseProperty();
@SerializedName(value = "initialized")
Expand Down Expand Up @@ -105,6 +107,16 @@ public ExternalDatabase(ExternalCatalog extCatalog, long id, String name, InitDa
this.extCatalog = extCatalog;
this.id = id;
this.name = name;
this.remoteName = name;
this.dbLogType = dbLogType;
}

public ExternalDatabase(ExternalCatalog extCatalog, long id, String name, String remoteName,
InitDatabaseLog.Type dbLogType) {
this.extCatalog = extCatalog;
this.id = id;
this.name = name;
this.remoteName = remoteName;
this.dbLogType = dbLogType;
}

Expand Down Expand Up @@ -242,7 +254,7 @@ private List<String> listTableNames() {
} else if (name.equals(MysqlDb.DATABASE_NAME)) {
tableNames = ExternalMysqlDatabase.listTableNames();
} else {
tableNames = extCatalog.listTableNames(null, name).stream().map(tableName -> {
tableNames = listRemoteTableNames().stream().map(tableName -> {
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
if (Env.isStoredTableNamesLowerCase()) {
return tableName.toLowerCase();
Expand All @@ -254,6 +266,10 @@ private List<String> listTableNames() {
return tableNames;
}

protected List<String> listRemoteTableNames() {
return extCatalog.listTableNames(null, name);
}

protected abstract T buildTableForInit(String tableName, long tblId, ExternalCatalog catalog);

public Optional<T> getTableForReplay(long tableId) {
Expand Down Expand Up @@ -328,6 +344,10 @@ public String getFullName() {
return name;
}

public String getRemoteName() {
return remoteName;
}

@Override
public DatabaseProperty getDbProperties() {
return dbProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
protected long id;
@SerializedName(value = "name")
protected String name;
@SerializedName(value = "remoteName")
protected String remoteName;
@SerializedName(value = "type")
protected TableType type = null;
@SerializedName(value = "timestamp")
protected long timestamp;
@SerializedName(value = "dbName")
protected String dbName;
@SerializedName(value = "remoteDbName")
protected String remoteDbName;
@SerializedName(value = "ta")
private final TableAttributes tableAttributes = new TableAttributes();

Expand Down Expand Up @@ -100,12 +104,25 @@ public ExternalTable() {
public ExternalTable(long id, String name, ExternalCatalog catalog, String dbName, TableType type) {
this.id = id;
this.name = name;
this.remoteName = name;
this.catalog = catalog;
this.dbName = dbName;
this.type = type;
this.objectCreated = false;
}

public ExternalTable(long id, String name, String remoteName, ExternalCatalog catalog, String dbName,
String remoteDbName, TableType type) {
this.id = id;
this.name = name;
this.remoteName = remoteName;
this.catalog = catalog;
this.dbName = dbName;
this.remoteDbName = remoteDbName;
this.type = type;
this.objectCreated = false;
}

public void setCatalog(ExternalCatalog catalog) {
this.catalog = catalog;
}
Expand Down Expand Up @@ -135,6 +152,10 @@ public String getName() {
return name;
}

public String getRemoteName() {
return remoteName;
}

@Override
public TableType getType() {
return type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.mapping.JdbcIdentifierMapping;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
Expand All @@ -55,6 +58,7 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand All @@ -71,6 +75,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
// Must add "transient" for Gson to ignore this field,
// or Gson will throw exception with HikariCP
private transient JdbcClient jdbcClient;
private transient IdentifierMapping identifierMapping;

public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment)
Expand Down Expand Up @@ -119,20 +124,19 @@ public void onRefresh(boolean invalidCache) {
super.onRefresh(invalidCache);
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
}

@Override
public void onRefreshCache(boolean invalidCache) {
onRefresh(invalidCache);
identifierMapping = null;
}

@Override
public void onClose() {
super.onClose();
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
identifierMapping = null;
}

protected Map<String, String> processCompatibleProperties(Map<String, String> props)
Expand Down Expand Up @@ -232,8 +236,6 @@ protected void initLocalObjectsImpl() {
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseMetaNames(getLowerCaseMetaNames())
.setMetaNamesMapping(getMetaNamesMapping())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())
Expand All @@ -243,22 +245,43 @@ protected void initLocalObjectsImpl() {
.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());

jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
identifierMapping = new JdbcIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()),
getMetaNamesMapping());
}

protected List<String> listDatabaseNames() {
@Override
public List<String> listDatabaseNames() {
return jdbcClient.getDatabaseNameList();
}

@Override
public String fromRemoteDatabaseName(String remoteDatabaseName) {
return identifierMapping.fromRemoteDatabaseName(remoteDatabaseName);
}

@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
return jdbcClient.getTablesNameList(dbName);
}

@Override
public String fromRemoteTableName(String remoteDatabaseName, String remoteTableName) {
return identifierMapping.fromRemoteTableName(remoteDatabaseName, remoteTableName);
}

@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
return jdbcClient.isTableExist(dbName, tblName);
ExternalTable tbl = Objects.requireNonNull(this.getDbNullable(dbName)).getTableNullable(tblName);
String remoteDbName = tbl.getRemoteDbName();
String remoteTblName = tbl.getRemoteName();
return jdbcClient.isTableExist(remoteDbName, remoteTblName);
}

public List<Column> listColumns(String dbName, String tblName) {
makeSureInitialized();
return jdbcClient.getColumnsFromJdbc(dbName, tblName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.InitDatabaseLog;

import java.util.List;

public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> {

/**
Expand All @@ -30,13 +32,21 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> {
* @param id database id.
* @param name database name.
*/
public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name, InitDatabaseLog.Type.JDBC);
public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name, String remoteName) {
super(extCatalog, id, name, remoteName, InitDatabaseLog.Type.JDBC);
}

@Override
protected JdbcExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) {
return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog);
String remoteDbName = this.getRemoteName();
String localTblName = extCatalog.fromRemoteTableName(remoteDbName, tableName);
return new JdbcExternalTable(tblId, localTblName, tableName, name, remoteDbName,
(JdbcExternalCatalog) extCatalog);
}

@Override
protected List<String> listRemoteTableNames() {
return extCatalog.listTableNames(null, remoteName);
}

public void addTableForTest(JdbcExternalTable tbl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.qe.AutoCloseConnectContext;
Expand All @@ -32,6 +33,7 @@
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Maps;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -40,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Jdbc external table.
Expand Down Expand Up @@ -76,8 +79,9 @@ public class JdbcExternalTable extends ExternalTable {
* @param dbName Database name.
* @param catalog HMSExternalDataSource.
*/
public JdbcExternalTable(long id, String name, String dbName, JdbcExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.JDBC_EXTERNAL_TABLE);
public JdbcExternalTable(long id, String name, String remoteName, String dbName, String remoteDbName,
JdbcExternalCatalog catalog) {
super(id, name, remoteName, catalog, dbName, remoteDbName, TableType.JDBC_EXTERNAL_TABLE);
}

@Override
Expand All @@ -102,21 +106,50 @@ public TTableDescriptor toThrift() {

@Override
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient()
.getColumnsFromJdbc(dbName, name)));
List<Column> columns = ((JdbcExternalCatalog) catalog).listColumns(remoteDbName, remoteName);
List<String> remoteColumnNames = columns.stream()
.map(Column::getName)
.collect(Collectors.toList());
List<String> localColumnNames = remoteColumnNames.stream()
.map(remoteColumnName -> ((JdbcExternalCatalog) catalog).getIdentifierMapping()
.fromRemoteColumnName(remoteDbName, remoteName, remoteColumnName))
.collect(Collectors.toList());
for (int i = 0; i < columns.size(); i++) {
columns.get(i).setName(localColumnNames.get(i));
}
Map<String, String> remoteColumnNamesMap = Maps.newHashMap();
for (int i = 0; i < remoteColumnNames.size(); i++) {
remoteColumnNamesMap.put(localColumnNames.get(i), remoteColumnNames.get(i));
}
return Optional.of(new JdbcSchemaCacheValue(columns, remoteColumnNamesMap));
}

private JdbcTable toJdbcTable() {
List<Column> schema = getFullSchema();
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
String fullTableName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName);

// Set remote properties
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name));
jdbcTable.setRemoteDatabaseName(((ExternalDatabase<?>) this.getDatabase()).getRemoteName());
jdbcTable.setRemoteTableName(this.getRemoteName());
Map<String, String> remoteColumnNames = Maps.newHashMap();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
for (Column column : schema) {
String remoteColumnName = schemaCacheValue.map(value -> ((JdbcSchemaCacheValue) value)
.getremoteColumnName(column.getName())).orElse(column.getName());
remoteColumnNames.put(column.getName(), remoteColumnName);
}
if (!remoteColumnNames.isEmpty()) {
jdbcTable.setRemoteColumnNames(remoteColumnNames);
} else {
remoteColumnNames = Maps.newHashMap();
for (Column column : getFullSchema()) {
remoteColumnNames.put(column.getName(), column.getName());
}
jdbcTable.setRemoteColumnNames(remoteColumnNames);
}

return jdbcTable;
}
Expand Down
Loading
Loading