From 3febff292addc6e08813348cfbf5a7a275ed3252 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Sat, 5 Oct 2024 21:56:38 +0800 Subject: [PATCH] add jdbc connection poll support for dataSource https://github.com/datavane/tis/issues/366 --- .../cassandra/CassandraDatasourceFactory.java | 2 +- .../ClickHouseDataSourceFactory.java | 3 ++- .../tis/fullbuild/taskflow/HiveTask.java | 24 +++++++++---------- .../datax/common/BasicDataXRdbmsWriter.java | 5 ++-- .../plugin/datax/common/TableColsMeta.java | 5 ++-- .../tis/plugin/ds/BasicDataSourceFactory.java | 10 ++++---- .../dameng/ds/DaMengDataSourceFactory.java | 6 +++-- .../dameng/writer/TestDataXDaMengWriter.java | 2 +- .../plugin/ds/doris/DorisSourceFactory.java | 3 ++- .../datax/MariaDBDataSourceFactory.java | 3 ++- .../tis/plugin/datax/DataXOdpsWriter.java | 7 +++--- .../tis/plugin/datax/odps/JoinOdpsTask.java | 11 +++++---- .../datax/odps/OdpsDataSourceFactory.java | 3 ++- .../ds/oracle/OracleDataSourceFactory.java | 3 ++- .../ds/postgresql/PGDataSourceFactory.java | 3 ++- .../sqlserver/SqlServerDatasourceFactory.java | 3 ++- .../ds/starrocks/BasicSourceFactory.java | 5 ++-- .../ds/starrocks/StarRocksSourceFactory.java | 5 ++-- .../ds/mysql/MySQLDataSourceFactory.java | 1 + .../ds/mysql/MySQLV5DataSourceFactory.java | 3 ++- .../ds/mysql/MySQLV8DataSourceFactory.java | 3 ++- .../tis/dump/hive/BindHiveTableTool.java | 5 ++-- .../dump/hive/HiveRemoveHistoryDataTask.java | 15 ++++++------ .../tis/dump/hive/HiveTableBuilder.java | 15 ++++++------ .../fullbuild/taskflow/hive/JoinHiveTask.java | 11 +++++---- .../flattable/HiveFlatTableBuilder.java | 3 ++- .../tis/plugin/datax/DataXHiveWriter.java | 9 +++---- .../tis/dump/hive/TestHiveDBUtils.java | 3 ++- .../hive/TestHiveRemoveHistoryDataTask.java | 3 ++- .../tis/dump/hive/TestHiveTableBuilder.java | 3 ++- .../plugin/datax/TestDataXHiveWriterDump.java | 7 +++--- .../qlangtech/tis/dump/hive/HiveDBUtils.java | 21 ++++++++-------- .../tis/hive/DefaultHiveConnGetter.java | 5 ++-- .../hive/Hiveserver2DataSourceFactory.java | 3 ++- .../main/java/com/qlangtech/tis/hive/Hms.java | 17 ++++++------- .../tis/dump/hive/TestHiveDBUtils.java | 3 ++- .../TestHiveserver2DataSourceFactory.java | 2 +- .../chunjun/sink/TISJdbcOutputFormat.java | 4 ++-- .../flink/connector/ChunjunSinkFactory.java | 3 ++- .../TestTISFlinkCDCOracleSourceFunction.java | 3 ++- .../TestFlinkCDCPostgreSQLSourceFunction.java | 3 ++- .../sink/TISClickhouseOutputFormat.java | 3 ++- .../TestChunjunClickhouseSinkFactory.java | 2 +- .../sink/TestChunjunDorisSinkFactory.java | 2 +- .../tis/realtime/BasicFlinkSourceHandle.java | 4 ---- .../incr/flink/cdc/CUDCDCTestSuit.java | 3 ++- .../DefaultPowerjobCoreDataSource.java | 2 +- .../starrocks/BaseStarRocksTestCase.java | 3 ++- 48 files changed, 151 insertions(+), 116 deletions(-) diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java index 66ff2147c..748ac037c 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java @@ -217,7 +217,7 @@ public String[] getHosts() { @Override - public JDBCConnection getConnection(String jdbcUrl,boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl,boolean verify) throws SQLException { throw new UnsupportedOperationException(); } diff --git a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java index 5cdbc8c37..56976bb29 100644 --- a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java +++ b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java @@ -26,6 +26,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.TableInDB; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; @@ -141,7 +142,7 @@ public final String getJdbcUrl() { // } @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { try { Class.forName(JDBC_DRIVER); diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/HiveTask.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/HiveTask.java index f431fcc73..76c0e5bc2 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/HiveTask.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/HiveTask.java @@ -93,7 +93,7 @@ protected HiveTask(IDataSourceFactoryGetter dsFactoryGetter, ISqlTask nodeMeta, * @return * @throws Exception */ - public static boolean isTableExists(DataSourceMeta ds, DataSourceMeta.JDBCConnection connection, EntityName dumpTable) throws Exception { + public static boolean isTableExists(DataSourceMeta ds, JDBCConnection connection, EntityName dumpTable) throws Exception { // 判断表是否存在 // if (!isDBExists(mrEngine, connection, dumpTable.getDbName())) { // // DB都不存在,table肯定就不存在啦 @@ -127,7 +127,7 @@ public static boolean isTableExists(DataSourceMeta ds, DataSourceMeta.JDBCConnec // return contain; } -// public static boolean isDBExists(MREngine mrEngine, DataSourceMeta.JDBCConnection connection, String dbName) throws Exception { +// public static boolean isDBExists(MREngine mrEngine, JDBCConnection connection, String dbName) throws Exception { // AtomicBoolean dbExist = new AtomicBoolean(false); // connection.query(mrEngine.showDBSQL, result -> { // if (StringUtils.equals(result.getString(1), dbName)) { @@ -200,7 +200,7 @@ protected final void executeTask(String taskname) { String insertSql = rewriteSql.convert2InsertIntoSQL(this.dsFactoryGetter.getDataSourceFactory(), this.nodeMeta.getExportName()); this.validateDependenciesNode(taskname); - final DataSourceMeta.JDBCConnection conn = this.getTaskContextObj(); + final JDBCConnection conn = this.getTaskContextObj(); DataSourceFactory dsFactory = dsFactoryGetter.getDataSourceFactory(); //final String newCreatePt = primaryTable.getTabPartition(); @@ -245,7 +245,7 @@ protected final void executeTask(String taskname) { */ private void processJoinTask(ISqlTask.RewriteSql sql) { try { - final DataSourceMeta.JDBCConnection conn = this.getTaskContextObj(); + final JDBCConnection conn = this.getTaskContextObj(); final ColsParser insertParser = new ColsParser(sql, conn); DataSourceFactory dsFactory = this.dsFactoryGetter.getDataSourceFactory(); @@ -265,11 +265,11 @@ private void processJoinTask(ISqlTask.RewriteSql sql) { * @param partitionRetainNum 保留多少个分区 * @throws Exception */ - protected abstract void initializeTable(DataSourceMeta mrEngine, ColsParser insertParser, DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception; + protected abstract void initializeTable(DataSourceMeta mrEngine, ColsParser insertParser, JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception; - protected abstract void executeSql(String sql, DataSourceMeta.JDBCConnection conn) throws SQLException; + protected abstract void executeSql(String sql, JDBCConnection conn) throws SQLException; - protected abstract List getHistoryPts(DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, final EntityName table) throws Exception; + protected abstract List getHistoryPts(DataSourceMeta mrEngine, JDBCConnection conn, final EntityName table) throws Exception; protected void validateDependenciesNode(String taskname) { Boolean dependencyWorkStatus = null; @@ -356,10 +356,10 @@ private Object getValue(String columnLabel) throws SQLException { protected class ColsParser { private final ISqlTask.RewriteSql sql; - private final DataSourceMeta.JDBCConnection conn; + private final JDBCConnection conn; private AbstractInsertFromSelectParser sqlParser; - public ColsParser(ISqlTask.RewriteSql sql, DataSourceMeta.JDBCConnection conn) { + public ColsParser(ISqlTask.RewriteSql sql, JDBCConnection conn) { this.sql = sql; this.conn = conn; } @@ -391,7 +391,7 @@ public List getColsExcludePartitionCols() { } } - private AbstractInsertFromSelectParser getSQLParserResult(String sql, DataSourceMeta.JDBCConnection conn) { + private AbstractInsertFromSelectParser getSQLParserResult(String sql, JDBCConnection conn) { DataSourceFactory dsFactory = this.dsFactoryGetter.getDataSourceFactory(); Function> sqlColMetaGetter = (rewriteSql) -> { List cols = rewriteSql.getCols(); @@ -423,7 +423,7 @@ public ColumnMetaData create(String colName, int index) throws SQLException { } // private AbstractInsertFromSelectParser getSQLParserResult( -// String sql, DataSourceFactory dsFactory, DataSourceMeta.JDBCConnection conn, AbstractInsertFromSelectParser insertParser) { +// String sql, DataSourceFactory dsFactory, JDBCConnection conn, AbstractInsertFromSelectParser insertParser) { // // TabPartitions tabPartition = new TabPartitions(Collections.emptyMap()) { // @Override @@ -454,7 +454,7 @@ public interface IHistoryTableProcessor { * @param dumpTable * @throws Exception */ - public static void initializeTable(DataSourceMeta ds, DataSourceMeta.JDBCConnection conn + public static void initializeTable(DataSourceMeta ds, JDBCConnection conn , EntityName dumpTable, IHistoryTableProcessor historyTableProcessor, Supplier tableSameJudgement, Runnable tableCreator) throws Exception { // if (partitionRetainNum == null || partitionRetainNum < 1) { // throw new IllegalArgumentException("illegal param partitionRetainNum "); diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java index 4310d2e3c..d67783d34 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java @@ -40,6 +40,7 @@ import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; import com.qlangtech.tis.plugin.ds.IInitWriterTableExecutor; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.PostedDSProp; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; @@ -162,7 +163,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter> private final DataSourceFactory datasource; private final String dbName; - private final DataSourceMeta.JDBCConnection connection; + private final JDBCConnection connection; public TableColsMeta(DataSourceFactory datasource, String dbName) { @@ -50,7 +51,7 @@ public TableColsMeta(DataSourceFactory datasource, String dbName) { this.dbName = dbName; final DBConfig dbConfig = datasource.getDbConfig(); - AtomicReference conn = new AtomicReference<>(); + AtomicReference conn = new AtomicReference<>(); try { dbConfig.vistDbName((config, jdbcUrl, ip, dbname) -> { conn.set(datasource.getConnection(jdbcUrl, false)); diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java index dbff240d7..1de407755 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/ds/BasicDataSourceFactory.java @@ -242,10 +242,10 @@ protected TableInDB createTableInDB() { return TableInDB.create(this); } - @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { - return super.getConnection(jdbcUrl, verify); - } +// @Override +// public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { +// return super.createConnection(jdbcUrl, verify); +// } protected String getRefectTablesSql() { @@ -309,7 +309,7 @@ private void visitConnection(DBConfig db, String ip, String dbName JDBCConnectionFactory connectionFactory = (url, verify) -> getConnection(url, verify); - + validateConnection(connectionFactory, jdbcUrl, p); } catch (TisException e) { throw e; diff --git a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java index b12fcd048..09c594ac6 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java +++ b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java @@ -14,6 +14,7 @@ import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.IDataSourceDumper; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; import com.qlangtech.tis.plugin.ds.SplitTableStrategy; import com.qlangtech.tis.plugin.ds.TISTable; @@ -96,7 +97,7 @@ protected EntityName logicTable2PhysicsTable(String jdbcUrl, EntityName table) { private transient dm.jdbc.driver.DmDriver driver; @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { if (driver == null) { driver = new dm.jdbc.driver.DmDriver(); } @@ -522,7 +523,8 @@ protected String getDataSourceName() { } @Override - protected boolean validateConnection(JDBCConnection conn, BasicDataSourceFactory dsFactory, IControlMsgHandler msgHandler, Context context) throws TisException { + protected boolean validateConnection(JDBCConnection conn, BasicDataSourceFactory dsFactory + , IControlMsgHandler msgHandler, Context context) throws TisException { final String schema = conn.getSchema(); DaMengDataSourceFactory damengSource = (DaMengDataSourceFactory) dsFactory; if (!StringUtils.equals(schema, damengSource.dbName) diff --git a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java index 9576f0d0c..f16cefdbe 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java +++ b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java @@ -159,7 +159,7 @@ public void testTempateGenerate() throws Exception { DaMengDataSourceFactory mysqlDs = TestDaMengDataSourceFactory.createDaMengDataSourceFactory(); // new DaMengDataSourceFactory() { // @Override -// public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl) throws SQLException { +// public JDBCConnection getConnection(String jdbcUrl) throws SQLException { // return null; // } // }; diff --git a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java index 8cf37fb12..939e395d3 100644 --- a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java +++ b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; @@ -108,7 +109,7 @@ public String buidJdbcUrl(DBConfig db, String ip, String dbName) { @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { Properties props = new Properties(); props.put("useSSL", "false"); props.put("user", StringUtils.trimToEmpty(this.userName)); diff --git a/tis-datax/tis-datax-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MariaDBDataSourceFactory.java b/tis-datax/tis-datax-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MariaDBDataSourceFactory.java index 8b4efc74a..44d339425 100644 --- a/tis-datax/tis-datax-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MariaDBDataSourceFactory.java +++ b/tis-datax/tis-datax-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MariaDBDataSourceFactory.java @@ -22,6 +22,7 @@ import com.qlangtech.tis.annotation.Public; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.ds.DBConfig; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.mysql.MySQLDataSourceFactory; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.lang3.StringUtils; @@ -42,7 +43,7 @@ public class MariaDBDataSourceFactory extends MySQLDataSourceFactory { private transient org.mariadb.jdbc.Driver driver; @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { if (driver == null) { driver = new org.mariadb.jdbc.Driver(); } diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java index 5dc6f9374..c9bc2f432 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java @@ -45,6 +45,7 @@ import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.ISqlTask; import com.qlangtech.tis.sql.parser.TabPartitions; @@ -322,10 +323,10 @@ public boolean isGenerateCreateDDLSwitchOff() { @Override public ExecuteResult startTask(ITableBuildTask dumpTask) { - try (DataSourceMeta.JDBCConnection conn = this.getConnection()) { + try (JDBCConnection conn = this.getConnection()) { return dumpTask.process(new ITaskContext() { @Override - public DataSourceMeta.JDBCConnection getObj() { + public JDBCConnection getObj() { return conn; } }); @@ -335,7 +336,7 @@ public DataSourceMeta.JDBCConnection getObj() { } - public DataSourceMeta.JDBCConnection getConnection() { + public JDBCConnection getConnection() { OdpsDataSourceFactory dsFactory = getDataSourceFactory(); String jdbcUrl = dsFactory.getJdbcUrl(); try { diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/JoinOdpsTask.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/JoinOdpsTask.java index d6371c9cd..8ba5b0e56 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/JoinOdpsTask.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/JoinOdpsTask.java @@ -34,6 +34,7 @@ import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.ISqlTask; import com.qlangtech.tis.sql.parser.er.IPrimaryTabFinder; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; @@ -67,7 +68,7 @@ public JoinOdpsTask(DataXOdpsWriter odpsWriter, IDataSourceFactoryGetter dsFacto } @Override - protected void executeSql(String sql, DataSourceMeta.JDBCConnection conn) throws SQLException { + protected void executeSql(String sql, JDBCConnection conn) throws SQLException { OdpsDataSourceFactory dsFactory = (OdpsDataSourceFactory) dsFactoryGetter.getDataSourceFactory(); @@ -172,7 +173,7 @@ public boolean change(Instance.TaskStatus ts) { @Override protected List getHistoryPts( - DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, EntityName table) throws Exception { + DataSourceMeta mrEngine, JDBCConnection conn, EntityName table) throws Exception { OdpsDataSourceFactory dsFactory = (OdpsDataSourceFactory) mrEngine; Table tab = dsFactory.getOdpsTable(table, Optional.empty()); PartitionSpec ptSpec = null; @@ -189,7 +190,7 @@ protected List getHistoryPts( @Override protected void initializeTable(DataSourceMeta ds, ColsParser insertParser - , DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception { + , JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception { OdpsDataSourceFactory dsFactory = (OdpsDataSourceFactory) ds; initializeTable(ds, conn, dumpTable, @@ -222,7 +223,7 @@ public void cleanHistoryTable() throws IOException { * @param conn */ private void createTable(OdpsDataSourceFactory dsFactory, EntityName dumpTable - , ColsParser insertParser, DataSourceMeta.JDBCConnection conn) { + , ColsParser insertParser, JDBCConnection conn) { // ISqlTask.RewriteSql rewriteSql = insertParser.getSql(); // String sql = rewriteSql.rewriteSql; // try { @@ -256,7 +257,7 @@ private void createTable(OdpsDataSourceFactory dsFactory, EntityName dumpTable } } - private boolean isTableSame(OdpsDataSourceFactory dsFactory, DataSourceMeta.JDBCConnection conn + private boolean isTableSame(OdpsDataSourceFactory dsFactory, JDBCConnection conn , ColsParser allCols, EntityName dumpTable) { TableSchema tabSchema = dsFactory.getTableSchema(dumpTable); for (HiveColumn col : allCols.getColsExcludePartitionCols()) { diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java index e5300be88..9d4cd7638 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java @@ -41,6 +41,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; @@ -167,7 +168,7 @@ public String getDbName() { } @Override - public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { try { Class.forName(DRIVER_NAME); } catch (ClassNotFoundException e) { diff --git a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/ds/oracle/OracleDataSourceFactory.java b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/ds/oracle/OracleDataSourceFactory.java index f09c91b8b..62c170227 100644 --- a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/ds/oracle/OracleDataSourceFactory.java +++ b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/ds/oracle/OracleDataSourceFactory.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; @@ -261,7 +262,7 @@ protected DataType getDataType(String keyName) throws SQLException { @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { try { Class.forName("oracle.jdbc.OracleDriver"); } catch (ClassNotFoundException e) { diff --git a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/postgresql/PGDataSourceFactory.java b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/postgresql/PGDataSourceFactory.java index 53632a9b0..ff8dc8492 100644 --- a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/postgresql/PGDataSourceFactory.java +++ b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/postgresql/PGDataSourceFactory.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.TableInDB; import com.qlangtech.tis.plugin.ds.TableNotFoundException; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; @@ -124,7 +125,7 @@ protected void refectTableInDB(TableInDB tabs, JDBCConnection conn) throws SQLEx } @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { try { Class.forName("org.postgresql.Driver"); } catch (ClassNotFoundException e) { diff --git a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/ds/sqlserver/SqlServerDatasourceFactory.java b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/ds/sqlserver/SqlServerDatasourceFactory.java index 8c0f4b42b..d3c2c7122 100644 --- a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/ds/sqlserver/SqlServerDatasourceFactory.java +++ b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/ds/sqlserver/SqlServerDatasourceFactory.java @@ -22,6 +22,7 @@ import com.qlangtech.tis.annotation.Public; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.DBConfig; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import org.apache.commons.lang.StringUtils; @@ -61,7 +62,7 @@ public Optional getEscapeChar() { private transient java.sql.Driver driver; @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { if (driver == null) { driver = createDriver(); } diff --git a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/BasicSourceFactory.java b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/BasicSourceFactory.java index a8abd97c5..62cb714ba 100644 --- a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/BasicSourceFactory.java +++ b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/BasicSourceFactory.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import org.apache.commons.lang.StringUtils; @@ -84,14 +85,14 @@ public String buidJdbcUrl(DBConfig db, String ip, String dbName) { @Override - public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { Properties props = new Properties(); props.put("user", StringUtils.trimToEmpty(this.userName)); if (StringUtils.isNotEmpty(this.password)) { props.put("password", StringUtils.trimToEmpty(this.password)); } try { - return new DataSourceMeta.JDBCConnection(mysql5Driver.connect(jdbcUrl, props), jdbcUrl); + return new JDBCConnection(mysql5Driver.connect(jdbcUrl, props), jdbcUrl); } catch (SQLException e) { throw TisException.create(e.getMessage() + ",jdbcUrl:" + jdbcUrl + ",props:" + props.toString(), e); } diff --git a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/StarRocksSourceFactory.java b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/StarRocksSourceFactory.java index d1dd543e3..34af098da 100644 --- a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/StarRocksSourceFactory.java +++ b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/ds/starrocks/StarRocksSourceFactory.java @@ -21,6 +21,7 @@ import com.qlangtech.tis.annotation.Public; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import java.sql.SQLException; import java.util.Optional; @@ -35,8 +36,8 @@ public class StarRocksSourceFactory extends BasicSourceFactory { public static final String DISPLAY_NAME = "StarRocks"; @Override - public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { - return super.getConnection(jdbcUrl, verify); + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { + return super.createConnection(jdbcUrl, verify); } @Override diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java index 90262e802..f31c37993 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java +++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.plugin.ds.FacadeDataSource; import com.qlangtech.tis.plugin.ds.IDataSourceDumper; import com.qlangtech.tis.plugin.ds.IFacadeDataSource; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; import com.qlangtech.tis.plugin.ds.SplitTableStrategy; import com.qlangtech.tis.plugin.ds.TISTable; diff --git a/tis-datax/tis-ds-mysql-v5-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV5DataSourceFactory.java b/tis-datax/tis-ds-mysql-v5-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV5DataSourceFactory.java index a07000a04..60406290f 100644 --- a/tis-datax/tis-ds-mysql-v5-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV5DataSourceFactory.java +++ b/tis-datax/tis-ds-mysql-v5-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV5DataSourceFactory.java @@ -20,6 +20,7 @@ import com.qlangtech.tis.annotation.Public; import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import org.apache.commons.lang3.StringUtils; import java.sql.SQLException; @@ -42,7 +43,7 @@ public class MySQLV5DataSourceFactory extends MySQLDataSourceFactory { private transient com.mysql.jdbc.Driver driver; @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { if (driver == null) { driver = new com.mysql.jdbc.Driver(); } diff --git a/tis-datax/tis-ds-mysql-v8-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV8DataSourceFactory.java b/tis-datax/tis-ds-mysql-v8-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV8DataSourceFactory.java index cbfcc4a7d..e497a04ad 100644 --- a/tis-datax/tis-ds-mysql-v8-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV8DataSourceFactory.java +++ b/tis-datax/tis-ds-mysql-v8-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLV8DataSourceFactory.java @@ -23,6 +23,7 @@ import com.qlangtech.tis.annotation.Public; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.ds.DataSourceFactory; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.lang.StringUtils; @@ -43,7 +44,7 @@ public class MySQLV8DataSourceFactory extends MySQLDataSourceFactory implements private transient com.mysql.cj.jdbc.Driver mysql8Driver; @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { if (mysql8Driver == null) { mysql8Driver = new com.mysql.cj.jdbc.Driver(); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/BindHiveTableTool.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/BindHiveTableTool.java index a6a4b180b..797ae14af 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/BindHiveTableTool.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/BindHiveTableTool.java @@ -26,6 +26,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.io.IOUtils; @@ -50,7 +51,7 @@ public class BindHiveTableTool { private static final Logger logger = LoggerFactory.getLogger(HiveTableBuilder.class); - public static void bindHiveTables(DataSourceMeta engine, DataSourceMeta.JDBCConnection hiveConn, + public static void bindHiveTables(DataSourceMeta engine, JDBCConnection hiveConn, ITISFileSystem fileSystem, Map> hiveTables , String timestamp) { @@ -67,7 +68,7 @@ public static void bindHiveTables(DataSourceMeta engine, DataSourceMeta.JDBCConn } - public static void bindHiveTables(DataSourceMeta engine, DataSourceMeta.JDBCConnection hiveConn, + public static void bindHiveTables(DataSourceMeta engine, JDBCConnection hiveConn, ITISFileSystem fileSystem, Map> hiveTables , String timestamp, HiveTableBuilder.IsTableSchemaSame isTableSchemaSame, HiveTableBuilder.CreateHiveTableAndBindPartition createHiveTableAndBindPartition) { diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveRemoveHistoryDataTask.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveRemoveHistoryDataTask.java index f7ea41342..06b2599ec 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveRemoveHistoryDataTask.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveRemoveHistoryDataTask.java @@ -25,6 +25,7 @@ import com.qlangtech.tis.order.dump.task.ITableDumpConstant; import com.qlangtech.tis.plugin.datax.MREngine; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -76,7 +77,7 @@ public HiveRemoveHistoryDataTask(MREngine mrEngine, ITISFileSystem fsFactory, Da * @param hiveConnection * @throws Exception */ - public void deleteHdfsHistoryFile(EntityName dumpTable, DataSourceMeta.JDBCConnection hiveConnection) { + public void deleteHdfsHistoryFile(EntityName dumpTable, JDBCConnection hiveConnection) { try { logger.info("start deleteHdfsHistoryFile data[{}] files", dumpTable); this.fileSystem.deleteHistoryFile(dumpTable); @@ -96,7 +97,7 @@ public void deleteHdfsHistoryFile(EntityName dumpTable, DataSourceMeta.JDBCConne * @param hiveConnection * @throws Exception */ - public void deleteHdfsHistoryFile(EntityName dumpTable, DataSourceMeta.JDBCConnection hiveConnection, String timestamp) { + public void deleteHdfsHistoryFile(EntityName dumpTable, JDBCConnection hiveConnection, String timestamp) { try { logger.info("start delete history data{} files", dumpTable); // this.deleteMetadata(dumpTable, timestamp); @@ -218,11 +219,11 @@ public void deleteHdfsHistoryFile(EntityName dumpTable, DataSourceMeta.JDBCConne // } - public void dropHistoryHiveTable(EntityName dumpTable, DataSourceMeta.JDBCConnection conn) { + public void dropHistoryHiveTable(EntityName dumpTable, JDBCConnection conn) { this.dropHistoryHiveTable(dumpTable, conn, ITableDumpConstant.MAX_PARTITION_SAVE); } - public List dropHistoryHiveTable(EntityName dumpTable, DataSourceMeta.JDBCConnection conn, Integer partitionRetainNum) { + public List dropHistoryHiveTable(EntityName dumpTable, JDBCConnection conn, Integer partitionRetainNum) { return this.dropHistoryHiveTable(dumpTable, conn, (r) -> true, partitionRetainNum); } @@ -230,7 +231,7 @@ public List dropHistoryHiveTable(EntityName dumpTab * 删除hive中的历史表 */ public List dropHistoryHiveTable( - EntityName dumpTable, DataSourceMeta.JDBCConnection conn, PartitionFilter filter, Integer maxPartitionSave) { + EntityName dumpTable, JDBCConnection conn, PartitionFilter filter, Integer maxPartitionSave) { if (maxPartitionSave < 1) { throw new IllegalArgumentException("param maxPartitionSave can not small than 1"); } @@ -279,11 +280,11 @@ private String getFullTabName(EntityName table) { return table.getFullName((this.ds.getEscapeChar())); } - public static List getHistoryPts(DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, final EntityName table) throws Exception { + public static List getHistoryPts(DataSourceMeta mrEngine, JDBCConnection conn, final EntityName table) throws Exception { return getHistoryPts(mrEngine, conn, (ps) -> true, table); } - private static List getHistoryPts(DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, PartitionFilter filter, final EntityName table) throws Exception { + private static List getHistoryPts(DataSourceMeta mrEngine, JDBCConnection conn, PartitionFilter filter, final EntityName table) throws Exception { final Set ptSet = new HashSet<>(); final String showPartition = "show partitions " + table.getFullName((mrEngine.getEscapeChar())); final Pattern ptPattern = Pattern.compile(pt + "=(\\d+)"); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveTableBuilder.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveTableBuilder.java index dbe5961b9..991ae465b 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveTableBuilder.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveTableBuilder.java @@ -25,6 +25,7 @@ import com.qlangtech.tis.hive.HdfsFormat; import com.qlangtech.tis.hive.HiveColumn; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -86,7 +87,7 @@ public HiveTableBuilder(String timestamp, HdfsFormat fsFileFormat) { // * @return // * @throws Exception // */ -// public static List getExistTables(MREngine mrEngine, DataSourceMeta.JDBCConnection conn, String dbName) throws Exception { +// public static List getExistTables(MREngine mrEngine, JDBCConnection conn, String dbName) throws Exception { // final List tables = new ArrayList<>(); // if (!HiveTask.isDBExists(mrEngine, conn, dbName)) { // // DB都不存在,table肯定就不存在啦 @@ -106,7 +107,7 @@ public HiveTableBuilder(String timestamp, HdfsFormat fsFileFormat) { * @throws Exception */ public StringBuffer createHiveTableAndBindPartition( - DataSourceMeta sourceMeta, DataSourceMeta.JDBCConnection conn, EntityName table, List cols, SQLCommandTailAppend sqlCommandTailAppend) throws Exception { + DataSourceMeta sourceMeta, JDBCConnection conn, EntityName table, List cols, SQLCommandTailAppend sqlCommandTailAppend) throws Exception { if (StringUtils.isEmpty(table.getDbName())) { throw new IllegalArgumentException("table.getDbName can not be empty"); } @@ -160,7 +161,7 @@ public StringBuffer createHiveTableAndBindPartition( * @param hiveTables * @throws Exception */ - public void bindHiveTables(DataSourceMeta engine, Map> hiveTables, DataSourceMeta.JDBCConnection conn) throws Exception { + public void bindHiveTables(DataSourceMeta engine, Map> hiveTables, JDBCConnection conn) throws Exception { bindHiveTables(engine, hiveTables, conn, (columns, hiveTable) -> { return isTableSame(engine, conn, columns.colsMeta, hiveTable); @@ -177,7 +178,7 @@ public void bindHiveTables(DataSourceMeta engine, Map> hiveTables, DataSourceMeta.JDBCConnection conn + public void bindHiveTables(DataSourceMeta engine, Map> hiveTables, JDBCConnection conn , IsTableSchemaSame isTableSchemaSame, CreateHiveTableAndBindPartition createHiveTableAndBindPartition) throws Exception { try { @@ -229,7 +230,7 @@ public interface IsTableSchemaSame { } - public static boolean isTableSame(DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, List columns, EntityName tableName) throws Exception { + public static boolean isTableSame(DataSourceMeta mrEngine, JDBCConnection conn, List columns, EntityName tableName) throws Exception { boolean isTableSame; final StringBuffer errMsg = new StringBuffer(); final StringBuffer equalsCols = new StringBuffer("compar equals:"); @@ -289,7 +290,7 @@ public boolean callback(ResultSet result) throws Exception { * @param * @throws Exception */ - void bindPartition(DataSourceMeta.JDBCConnection conn, BindHiveTableTool.HiveBindConfig columns, EntityName table, int startIndex) throws Exception { + void bindPartition(JDBCConnection conn, BindHiveTableTool.HiveBindConfig columns, EntityName table, int startIndex) throws Exception { visitSubPmodPath(table, columns, startIndex, (pmod, path) -> { String sql = "alter table " + table + " add if not exists partition(" + IDumpTable.PARTITION_PT + "='" @@ -325,7 +326,7 @@ private interface FSPathVisitor { } - private void createHiveTableAndBindPartition(DataSourceMeta sourceMeta, DataSourceMeta.JDBCConnection conn, BindHiveTableTool.HiveBindConfig columns, EntityName tableName) throws Exception { + private void createHiveTableAndBindPartition(DataSourceMeta sourceMeta, JDBCConnection conn, BindHiveTableTool.HiveBindConfig columns, EntityName tableName) throws Exception { createHiveTableAndBindPartition(sourceMeta, conn, tableName, columns.colsMeta, (hiveSQl) -> { //final String hivePath = tableName.getNameWithPath(); int startIndex = 0; diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/hive/JoinHiveTask.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/hive/JoinHiveTask.java index 0c270e61d..c9cc6ea0f 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/hive/JoinHiveTask.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/hive/JoinHiveTask.java @@ -34,6 +34,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.ISqlTask; import com.qlangtech.tis.sql.parser.er.IPrimaryTabFinder; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; @@ -71,13 +72,13 @@ public JoinHiveTask(ISqlTask nodeMeta, boolean isFinalNode, Supplier getHistoryPts( - DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, EntityName table) throws Exception { + DataSourceMeta mrEngine, JDBCConnection conn, EntityName table) throws Exception { return HiveRemoveHistoryDataTask.getHistoryPts(mrEngine, conn, table); } @@ -92,7 +93,7 @@ protected List getHistoryPts( @Override protected void initializeTable(DataSourceMeta ds , ColsParser insertParser - , DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception { + , JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception { final String path = FSHistoryFileUtils.getJoinTableStorePath(fileSystem.getRootDir(), dumpTable); if (fileSystem == null) { @@ -123,7 +124,7 @@ public void cleanHistoryTable() throws IOException { public static void cleanHistoryTable(ITISFileSystem fileSystem, IPath parentPath - , DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws IOException { + , DataSourceMeta mrEngine, JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws IOException { // 表结构没有变化,需要清理表中的历史数据 清理历史hdfs数据 //this.fs2Table.deleteHistoryFile(dumpTable, this.getTaskContext()); // 清理hive数据 @@ -143,7 +144,7 @@ protected AbstractInsertFromSelectParser createInsertSQLParser(String sql, Funct * 创建hive表 */ public static void createHiveTable(ITISFileSystem fileSystem, HdfsFormat fsFormat - , DataSourceMeta sourceMeta, EntityName dumpTable, List cols, DataSourceMeta.JDBCConnection conn) { + , DataSourceMeta sourceMeta, EntityName dumpTable, List cols, JDBCConnection conn) { try { HiveTableBuilder tableBuilder = new HiveTableBuilder("0", fsFormat); tableBuilder.createHiveTableAndBindPartition(sourceMeta, conn, dumpTable, cols, (hiveSQl) -> { diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/offline/flattable/HiveFlatTableBuilder.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/offline/flattable/HiveFlatTableBuilder.java index 49f412e33..19138b638 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/offline/flattable/HiveFlatTableBuilder.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/offline/flattable/HiveFlatTableBuilder.java @@ -22,6 +22,7 @@ import com.qlangtech.tis.dump.hive.HiveDBUtils; import com.qlangtech.tis.hive.DefaultHiveConnGetter; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; @@ -164,7 +165,7 @@ public static boolean validateHiveAvailable(IControlMsgHandler msgHandler, Conte // } // } - DataSourceMeta.JDBCConnection conn = null; + JDBCConnection conn = null; try { conn = HiveDBUtils.getInstance(hiveAddress, dbName, params.getUserToken()).createConnection(); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java index 0b32dab9f..ca46fc89e 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java @@ -65,6 +65,7 @@ import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.ISqlTask; import com.qlangtech.tis.sql.parser.TabPartitions; @@ -137,12 +138,12 @@ public TimeFormat getPsFormat() { public ExecuteResult startTask(ITableBuildTask dumpTask) { try { - try (DataSourceMeta.JDBCConnection conn = getConnection()) { + try (JDBCConnection conn = getConnection()) { HiveDBUtils.executeNoLog(conn, "SET hive.exec.dynamic.partition = true"); HiveDBUtils.executeNoLog(conn, "SET hive.exec.dynamic.partition.mode = nonstrict"); return dumpTask.process(new ITaskContext() { @Override - public DataSourceMeta.JDBCConnection getObj() { + public JDBCConnection getObj() { return conn; } }); @@ -231,7 +232,7 @@ public static String getDftTemplate() { return IOUtils.loadResourceFromClasspath(DataXHiveWriter.class, "DataXHiveWriter-tpl.json"); } - public DataSourceMeta.JDBCConnection getConnection() { + public JDBCConnection getConnection() { Hiveserver2DataSourceFactory dsFactory = getDataSourceFactory(); String jdbcUrl = dsFactory.getJdbcUrl(); try { @@ -398,7 +399,7 @@ private void bindHiveTables(IExecChainContext execContext, ISelectedTab tab) { if (!execContext.isDryRun()) { Path tabDumpParentPath = getTabDumpParentPath(execContext, tab); Hiveserver2DataSourceFactory dsFactory = this.getDataSourceFactory(); - try (DataSourceMeta.JDBCConnection hiveConn = this.getConnection()) { + try (JDBCConnection hiveConn = this.getConnection()) { final Path dumpParentPath = tabDumpParentPath; BindHiveTableTool.bindHiveTables(dsFactory, hiveConn, this.getFs().getFileSystem() , Collections.singletonMap(dumpTable, () -> { diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java index 3d3f44300..b8505513b 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java @@ -19,6 +19,7 @@ package com.qlangtech.tis.dump.hive; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import junit.framework.TestCase; import java.sql.ResultSet; @@ -33,7 +34,7 @@ public class TestHiveDBUtils extends TestCase { public void testTetConnection() throws Exception { HiveDBUtils dbUtils = HiveDBUtils.getInstance("192.168.28.200:10000", "tis"); - try (DataSourceMeta.JDBCConnection con = dbUtils.createConnection()) { + try (JDBCConnection con = dbUtils.createConnection()) { // // Connection con = DriverManager.getConnection( // // "jdbc:hive://10.1.6.211:10000/tis", "", ""); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveRemoveHistoryDataTask.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveRemoveHistoryDataTask.java index ca6986615..5b4412139 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveRemoveHistoryDataTask.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveRemoveHistoryDataTask.java @@ -23,6 +23,7 @@ import com.qlangtech.tis.fs.ITISFileSystem; import com.qlangtech.tis.plugin.datax.MREngine; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import junit.framework.TestCase; import org.easymock.EasyMock; @@ -47,7 +48,7 @@ public void testDropHistoryHiveTable() throws Exception { DataSourceMeta sourceMeta = EasyMock.createMock("sourceMeta", DataSourceMeta.class); ITISFileSystem fileSystem = EasyMock.createMock("fileSystem", ITISFileSystem.class); - DataSourceMeta.JDBCConnection hiveConn = EasyMock.createMock("hiveConn", DataSourceMeta.JDBCConnection.class); + JDBCConnection hiveConn = EasyMock.createMock("hiveConn", JDBCConnection.class); Statement showDBStatment = EasyMock.createMock("showDBStatment", Statement.class); ResultSet resultSet = EasyMock.createMock("resultSet", ResultSet.class); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveTableBuilder.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveTableBuilder.java index 294a1da1b..26dd52425 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveTableBuilder.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveTableBuilder.java @@ -27,6 +27,7 @@ import com.qlangtech.tis.hive.TestHiveInsertFromSelectParser; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import junit.framework.TestCase; import org.easymock.EasyMock; @@ -55,7 +56,7 @@ public void testCreateHiveTableAndBindPartition() throws Exception { EasyMock.expect(sourceMeta.removeEscapeChar(c.getName())).andReturn(c.getName()); } - DataSourceMeta.JDBCConnection conn = EasyMock.mock("conn", DataSourceMeta.JDBCConnection.class); + JDBCConnection conn = EasyMock.mock("conn", JDBCConnection.class); Connection connection = EasyMock.mock("connection", Connection.class); Statement statement = EasyMock.mock("statement", Statement.class); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXHiveWriterDump.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXHiveWriterDump.java index 513542673..8027d1ea4 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXHiveWriterDump.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXHiveWriterDump.java @@ -43,6 +43,7 @@ import com.qlangtech.tis.plugin.common.WriterTemplate; import com.qlangtech.tis.plugin.datax.impl.TextFSFormat; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.sql.parser.TabPartitions; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; @@ -123,7 +124,7 @@ public Class getOwnerClass() { IDataxProcessor processor = EasyMock.mock("processor", IDataxProcessor.class); File ddlDir = folder.newFolder("ddlDir"); - CreateTableSqlBuilder.CreateDDL createDDL = dataxWriter.generateCreateDDL(applicationTab); + CreateTableSqlBuilder.CreateDDL createDDL = dataxWriter.generateCreateDDL(applicationTab, Optional.empty()); Assert.assertNotNull("createDDL can not be null", createDDL); FileUtils.write(new File(ddlDir, applicationTab.getTo() + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX) @@ -142,7 +143,7 @@ public Class getOwnerClass() { preExec.run(); WriterTemplate.realExecuteDump(TestDataXHiveWriter.mysql2hiveDataXName - , DataXCfgJson.path(TestDataXHiveWriterDump.class,"hive-datax-writer-assert-without-option-val.json"), dataxWriter); + , DataXCfgJson.path(TestDataXHiveWriterDump.class, "hive-datax-writer-assert-without-option-val.json"), dataxWriter); IRemoteTaskTrigger postExec = dataxWriter.createPostTask(execContext, applicationTab.getSourceTab(), null); postExec.run(); @@ -154,7 +155,7 @@ public Class getOwnerClass() { Assert.assertTrue(applicationPS.isPresent()); Assert.assertEquals(pt, applicationPS.get().pt.getPt()); - try (DataSourceMeta.JDBCConnection connection = dataxWriter.getConnection()) { + try (JDBCConnection connection = dataxWriter.getConnection()) { connection.query("select count(1) from " + applicationTab.getTo() + " where " + IDumpTable.PARTITION_PT + "='" + pt + "'" , (result) -> { diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveDBUtils.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveDBUtils.java index ed80ba1fc..e31396ff7 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveDBUtils.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/dump/hive/HiveDBUtils.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.hive.Hms; import com.qlangtech.tis.job.common.JobCommon; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DelegatingStatement; @@ -203,14 +204,14 @@ public Connection createConnection() throws SQLException { return hiveDatasource; } - public DataSourceMeta.JDBCConnection createConnection() { + public JDBCConnection createConnection() { return createConnection(0); } - public DataSourceMeta.JDBCConnection createConnection(int retry) { - DataSourceMeta.JDBCConnection conn = null; + public JDBCConnection createConnection(int retry) { + JDBCConnection conn = null; try { - conn = new DataSourceMeta.JDBCConnection(hiveDatasource.getConnection(), hiveJdbcUrl); + conn = new JDBCConnection(hiveDatasource.getConnection(), hiveJdbcUrl); executeNoLog(conn, "set hive.exec.dynamic.partition.mode=nonstrict"); return conn; } catch (Exception e) { @@ -233,22 +234,22 @@ public DataSourceMeta.JDBCConnection createConnection(int retry) { } } - public void close(DataSourceMeta.JDBCConnection conn) { + public void close(JDBCConnection conn) { try { conn.close(); } catch (Throwable e) { } } - public static boolean execute(DataSourceMeta.JDBCConnection conn, String sql, IJoinTaskStatus joinTaskStatus) throws SQLException { + public static boolean execute(JDBCConnection conn, String sql, IJoinTaskStatus joinTaskStatus) throws SQLException { return execute(conn, sql, true, /* listenLog */ joinTaskStatus); } - public static boolean execute(DataSourceMeta.JDBCConnection conn, String sql) throws SQLException { + public static boolean execute(JDBCConnection conn, String sql) throws SQLException { return execute(conn, sql, new JoinTaskStatus("dump")); } - public static boolean executeNoLog(DataSourceMeta.JDBCConnection conn, String sql) throws SQLException { + public static boolean executeNoLog(JDBCConnection conn, String sql) throws SQLException { return execute(conn, sql, false, /* listenLog */ new JoinTaskStatus("dump")); } @@ -260,7 +261,7 @@ public static boolean executeNoLog(DataSourceMeta.JDBCConnection conn, String sq * @return * @throws Exception */ - private static boolean execute(DataSourceMeta.JDBCConnection conn, String sql, boolean listenLog, IJoinTaskStatus joinTaskStatus) throws SQLException { + private static boolean execute(JDBCConnection conn, String sql, boolean listenLog, IJoinTaskStatus joinTaskStatus) throws SQLException { synchronized (HiveDBUtils.class) { try (Statement stmt = conn.getConnection().createStatement()) { // Future f = null;// exec.submit(createLogRunnable(stmt)); @@ -368,7 +369,7 @@ public static void main(String[] args) throws Exception { HiveDBUtils dbUtils = HiveDBUtils.getInstance("192.168.28.200", "tis"); - DataSourceMeta.JDBCConnection con = dbUtils.createConnection(); + JDBCConnection con = dbUtils.createConnection(); // // Connection con = DriverManager.getConnection( // // "jdbc:hive://10.1.6.211:10000/tis", "", ""); diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveConnGetter.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveConnGetter.java index 563ebd199..a169bb0aa 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveConnGetter.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/DefaultHiveConnGetter.java @@ -39,6 +39,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import org.apache.commons.lang.StringUtils; @@ -118,7 +119,7 @@ private static boolean validateHiveAvailable(IControlMsgHandler msgHandler, Cont // } // } - DataSourceMeta.JDBCConnection conn = null; + JDBCConnection conn = null; try { conn = HiveDBUtils.getInstance(hiveAddress, dbName, params.getUserToken()).createConnection(); @@ -161,7 +162,7 @@ public String getMetaStoreUrls() { } @Override - public DataSourceMeta.JDBCConnection createConfigInstance() { + public JDBCConnection createConfigInstance() { try { return HiveDBUtils.getInstance(this.hiveAddress, this.dbName, getUserToken()).createConnection(); } catch (Throwable e) { diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java index 0b68140ee..7eded9860 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java @@ -32,6 +32,7 @@ import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataSourceFactory; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JdbcUrlBuilder; import com.qlangtech.tis.plugin.ds.TableInDB; import com.qlangtech.tis.plugin.ds.TableNotFoundException; @@ -129,7 +130,7 @@ public UserToken getUserToken() { } @Override - public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { return getConnection(jdbcUrl, false, verify); } diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hms.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hms.java index 560babc55..1a3ee5565 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hms.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hms.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.jdbc.HiveDriver; @@ -50,13 +51,13 @@ public class Hms implements Describable { @FormField(ordinal = 5, validate = {Validator.require}) public UserToken userToken; - public static DataSourceMeta.JDBCConnection createConnection(String jdbcUrl, UserToken userToken) throws Exception { + public static JDBCConnection createConnection(String jdbcUrl, UserToken userToken) throws Exception { HiveDriver hiveDriver = new HiveDriver(); Properties props = new Properties(); StringBuffer jdbcUrlBuffer = new StringBuffer(jdbcUrl); - return userToken.accept(new IUserTokenVisitor() { + return userToken.accept(new IUserTokenVisitor() { @Override - public DataSourceMeta.JDBCConnection visit(IUserNamePasswordUserToken ut) throws Exception { + public JDBCConnection visit(IUserNamePasswordUserToken ut) throws Exception { props.setProperty(Utils.JdbcConnectionParams.AUTH_USER, ut.getUserName()); props.setProperty(Utils.JdbcConnectionParams.AUTH_PASSWD, ut.getPassword()); UserGroupInformation.setConfiguration(new HiveConf()); @@ -64,13 +65,13 @@ public DataSourceMeta.JDBCConnection visit(IUserNamePasswordUserToken ut) throws } @Override - public DataSourceMeta.JDBCConnection visit(IOffUserToken token) throws Exception { + public JDBCConnection visit(IOffUserToken token) throws Exception { UserGroupInformation.setConfiguration(new HiveConf()); return createConnection(hiveDriver, props, jdbcUrlBuffer); } @Override - public DataSourceMeta.JDBCConnection visit(IKerberosUserToken token) throws Exception { + public JDBCConnection visit(IKerberosUserToken token) throws Exception { IKerberos kerberosCfg = token.getKerberosCfg(); jdbcUrlBuffer.append(";principal=") .append(kerberosCfg.getPrincipal()); @@ -90,14 +91,14 @@ public DataSourceMeta.JDBCConnection visit(IKerberosUserToken token) throws Exce }); } - private static DataSourceMeta.JDBCConnection createConnection( + private static JDBCConnection createConnection( HiveDriver hiveDriver, Properties props, StringBuffer jdbcUrlBuffer) throws SQLException { String jdbcUrl = jdbcUrlBuffer.toString(); - return new DataSourceMeta.JDBCConnection(hiveDriver.connect(jdbcUrl, props), jdbcUrl); + return new JDBCConnection(hiveDriver.connect(jdbcUrl, props), jdbcUrl); } - public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl, String dbName, boolean usingPool) throws SQLException { + public JDBCConnection getConnection(String jdbcUrl, String dbName, boolean usingPool) throws SQLException { final ClassLoader currentLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(Hiveserver2DataSourceFactory.class.getClassLoader()); diff --git a/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java b/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java index 7a370c5ed..a10bdab10 100644 --- a/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java +++ b/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/dump/hive/TestHiveDBUtils.java @@ -19,6 +19,7 @@ package com.qlangtech.tis.dump.hive; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import org.junit.Test; import java.sql.ResultSet; @@ -40,7 +41,7 @@ public void test1() throws Exception { public void testConn() throws Exception { HiveDBUtils dbUtils = HiveDBUtils.getInstance("192.168.28.200:10000", "default"); - try (DataSourceMeta.JDBCConnection conn = dbUtils.createConnection()) { + try (JDBCConnection conn = dbUtils.createConnection()) { Statement statement = conn.createStatement(); try (ResultSet resultSet = statement.executeQuery("select count(instance_id) as countt,`type` FROM instancedetail where 1=0 group by type")) { diff --git a/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/hive/TestHiveserver2DataSourceFactory.java b/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/hive/TestHiveserver2DataSourceFactory.java index 603c1a392..f6dcddd2f 100644 --- a/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/hive/TestHiveserver2DataSourceFactory.java +++ b/tis-datax/tis-hive-plugin/src/test/java/com/qlangtech/tis/hive/TestHiveserver2DataSourceFactory.java @@ -46,7 +46,7 @@ public void testCreateConnWithKerberOs() throws Exception { UserToken userToken = createKerberToken(); // UserToken userToken = new Kerber; - try (DataSourceMeta.JDBCConnection conn = Hms.createConnection(jdbcUrl, userToken)) { + try (JDBCConnection conn = Hms.createConnection(jdbcUrl, userToken)) { Assert.assertNotNull(conn); conn.query("show tables", (result) -> { diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java index 5eaf98849..7766040ca 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java @@ -21,7 +21,7 @@ import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; import com.dtstack.chunjun.connector.jdbc.sink.SinkColMetas; import com.qlangtech.tis.plugin.ds.DataSourceFactory; -import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils; import java.sql.Connection; @@ -53,7 +53,7 @@ protected final void initializeRowConverter() { @Override protected Connection getConnection() throws SQLException { DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null"); - DataSourceMeta.JDBCConnection conn = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false); + JDBCConnection conn = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false); return conn.getConnection(); } } diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index d1b7a2c38..b8c8e8be8 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -72,6 +72,7 @@ import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.incr.ISelectedTabExtendFactory; import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.ColMetaUtils; @@ -508,7 +509,7 @@ protected DataStreamSink createOutput( DataStream dataSet, OutputFormat outputFormat) { JdbcOutputFormat routputFormat = (JdbcOutputFormat) outputFormat; - try (DataSourceMeta.JDBCConnection conn = dsFactory.getConnection(jdbcUrl, false)) { + try (JDBCConnection conn = dsFactory.getConnection(jdbcUrl, false)) { routputFormat.dbConn = conn.getConnection(); routputFormat.initColumnList(); } catch (SQLException e) { diff --git a/tis-incr/tis-flink-cdc-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/oracle/TestTISFlinkCDCOracleSourceFunction.java b/tis-incr/tis-flink-cdc-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/oracle/TestTISFlinkCDCOracleSourceFunction.java index a0d3b984a..7d498567a 100644 --- a/tis-incr/tis-flink-cdc-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/oracle/TestTISFlinkCDCOracleSourceFunction.java +++ b/tis-incr/tis-flink-cdc-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/oracle/TestTISFlinkCDCOracleSourceFunction.java @@ -29,6 +29,7 @@ import com.qlangtech.tis.manage.common.CenterResource; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import org.apache.flink.test.util.AbstractTestBase; import org.junit.*; import org.junit.rules.TestRule; @@ -86,7 +87,7 @@ protected BasicDataSourceFactory createDataSourceFactory(TargetResName dataxName } @Override - protected void startProcessConn(DataSourceMeta.JDBCConnection conn) throws SQLException { + protected void startProcessConn(JDBCConnection conn) throws SQLException { conn.getConnection().setAutoCommit(false); } diff --git a/tis-incr/tis-flink-cdc-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/TestFlinkCDCPostgreSQLSourceFunction.java b/tis-incr/tis-flink-cdc-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/TestFlinkCDCPostgreSQLSourceFunction.java index 895ba2ae5..74ea02a66 100644 --- a/tis-incr/tis-flink-cdc-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/TestFlinkCDCPostgreSQLSourceFunction.java +++ b/tis-incr/tis-flink-cdc-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/cdc/postgresql/TestFlinkCDCPostgreSQLSourceFunction.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsReader; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.incr.TISSinkFactory; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.junit.Assert; @@ -110,7 +111,7 @@ protected int executeStatement(Connection connection, java.sql.Statement stateme } @Override - protected void startProcessConn(DataSourceMeta.JDBCConnection conn) throws SQLException { + protected void startProcessConn(JDBCConnection conn) throws SQLException { super.startProcessConn(conn); conn.getConnection().setAutoCommit(false); } diff --git a/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TISClickhouseOutputFormat.java b/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TISClickhouseOutputFormat.java index dce8ca738..c26d1b5b8 100644 --- a/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TISClickhouseOutputFormat.java +++ b/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TISClickhouseOutputFormat.java @@ -23,6 +23,7 @@ import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils; import java.sql.Connection; @@ -55,7 +56,7 @@ protected void initializeRowConverter() { @Override protected Connection getConnection() throws SQLException { DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null"); - DataSourceMeta.JDBCConnection connection = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false); + JDBCConnection connection = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false); return connection.getConnection(); } diff --git a/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TestChunjunClickhouseSinkFactory.java b/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TestChunjunClickhouseSinkFactory.java index 9b9551cf1..f42c788c6 100644 --- a/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TestChunjunClickhouseSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-clickhouse-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/clickhouse/sink/TestChunjunClickhouseSinkFactory.java @@ -276,7 +276,7 @@ public ClickHouseDataSourceFactory getDataSourceFactory() { }); try { - try (DataSourceMeta.JDBCConnection conn = sourceFactory.getConnection(jdbcUrls[0], false)) { + try (JDBCConnection conn = sourceFactory.getConnection(jdbcUrls[0], false)) { Statement statement = conn.createStatement(); //+ " where id='" + colIdVal + "'" ResultSet resultSet = statement.executeQuery("select * from " + jdbcUrls[1] + "." + tableName); diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java index 14f8d86df..026319f1f 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java @@ -142,7 +142,7 @@ public static void initializeDorisDB() throws Exception { String colName = null; dsFactory = getDorisSourceFactory(feServiceHost, jdbcPort, loadPort); - try (DataSourceMeta.JDBCConnection conn = dsFactory.getConnection( + try (JDBCConnection conn = dsFactory.getConnection( dsFactory.buidJdbcUrl(null, feServiceHost, null), false)) { try (Statement statement = conn.createStatement()) { diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/BasicFlinkSourceHandle.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/BasicFlinkSourceHandle.java index c2493e6fd..ddfca64bf 100644 --- a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/BasicFlinkSourceHandle.java +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/BasicFlinkSourceHandle.java @@ -107,10 +107,6 @@ public JobExecutionResult consume(TargetResName dataxName, AsyncMsg