diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java index 748ac037c..c2222c4a7 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/ds/cassandra/CassandraDatasourceFactory.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.ds.*; + import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import org.apache.commons.lang.StringUtils; @@ -85,7 +86,7 @@ public DBConfig getDbConfig() { public DataDumpers getDataDumpers(TISTable table) { List jdbcUrls = Lists.newArrayList(); for (String host : this.getHosts()) { - jdbcUrls.add(host); + jdbcUrls.add((host)); } return DataDumpers.create(jdbcUrls, table); } @@ -217,7 +218,7 @@ public String[] getHosts() { @Override - public JDBCConnection createConnection(String jdbcUrl,boolean verify) throws SQLException { + public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { throw new UnsupportedOperationException(); } diff --git a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java index 2e19df87d..42c757a55 100644 --- a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java +++ b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java @@ -32,6 +32,7 @@ import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; + import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.clickhouse.ClickHouseDataSourceFactory; diff --git a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java index 56976bb29..c686e47e5 100644 --- a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java +++ b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/ds/clickhouse/ClickHouseDataSourceFactory.java @@ -25,6 +25,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.TableInDB; diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java index d67783d34..a9e0e1dd1 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java @@ -36,6 +36,7 @@ import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; @@ -163,7 +164,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter getTableMetadata(JDBCConnection conn, boolean inSink private List parseTableColMeta(EntityName table, boolean inSink, DBConfig config, String ip, String dbname) throws Exception { // List columns = Lists.newArrayList(); String jdbcUrl = buidJdbcUrl(config, ip, dbname); - - return parseTableColMeta(inSink, table, jdbcUrl); + return parseTableColMeta(inSink, table, (jdbcUrl)); } @@ -306,11 +306,7 @@ private void visitConnection(DBConfig db, String ip, String dbName //Connection conn = null; String jdbcUrl = buidJdbcUrl(db, ip, dbName); try { - - - JDBCConnectionFactory connectionFactory = (url, verify) -> getConnection(url, verify); - - validateConnection(connectionFactory, jdbcUrl, p); + validateConnection((jdbcUrl), p); } catch (TisException e) { throw e; } catch (Exception e) { diff --git a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java index 09c594ac6..1a02ccea2 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java +++ b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/ds/DaMengDataSourceFactory.java @@ -10,6 +10,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataDumpers; import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataType; @@ -280,7 +281,7 @@ public boolean hasNext() { @Override public IDataSourceDumper next() { - final String jdbcUrl = jdbcUrls.get(index.getAndIncrement()); + final String jdbcUrl = jdbcUrls.get(index.getAndIncrement()); return new MySqlDataSourceDumper(jdbcUrl, table); } }; @@ -303,7 +304,7 @@ public String getDBSchema() { } private class MySqlDataSourceDumper implements IDataSourceDumper { - private final String jdbcUrl; + private final String jdbcUrl; private final TISTable table; private JDBCConnection connection; diff --git a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriterReal.java b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriterReal.java index b9bac10f2..f580c42a2 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriterReal.java +++ b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriterReal.java @@ -2,6 +2,7 @@ import com.alibaba.datax.plugin.writer.hdfswriter.HdfsColMeta; import com.google.common.collect.Lists; +import com.qlangtech.tis.datax.DataXCfgFile; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.datax.impl.DataxWriter; @@ -81,7 +82,7 @@ public void testRealDump() throws Exception { TestDataXDaMengWriter.setPlaceholderReader(); DataXCfgJson wjson = DataXCfgJson.content(TestDataXDaMengWriter.generateDataXCfg(writer, Optional.of(tabMap))); - CreateTableSqlBuilder.CreateDDL ddl = writer.generateCreateDDL(tabMap); + CreateTableSqlBuilder.CreateDDL ddl = writer.generateCreateDDL(tabMap, Optional.empty()); DataxProcessor dataXProcessor = EasyMock.mock("dataXProcessor", DataxProcessor.class); File createDDLDir = folder.newFolder();// new File("."); diff --git a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java index 939e395d3..5e56c5575 100644 --- a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java +++ b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/ds/doris/DorisSourceFactory.java @@ -34,6 +34,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JDBCTypes; diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java index c9bc2f432..4b1dc1472 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java @@ -40,6 +40,7 @@ import com.qlangtech.tis.plugin.datax.odps.OdpsDataSourceFactory; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; + import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.IColMetaGetter; @@ -342,7 +343,7 @@ public JDBCConnection getConnection() { try { return dsFactory.getConnection(jdbcUrl, false); } catch (SQLException e) { - throw new RuntimeException(jdbcUrl, e); + throw new RuntimeException(String.valueOf(jdbcUrl), e); } } @@ -376,7 +377,7 @@ public static class OdpsContext implements IDataxContext { public OdpsContext(DataXOdpsWriter odpsWriter, IDataxProcessor.TableMap tableMapper, Optional transformerRules) { this.odpsWriter = odpsWriter; this.tableMapper = tableMapper; - this.cols = TabCols.create( null, tableMapper, transformerRules).getRawCols(); + this.cols = TabCols.create(null, tableMapper, transformerRules).getRawCols(); this.dsFactory = odpsWriter.getDataSourceFactory(); this.accessKey = this.dsFactory.getAccessKey(); diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java index 9d4cd7638..f7ffcdeaa 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/odps/OdpsDataSourceFactory.java @@ -40,6 +40,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.TableNotFoundException; diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java index f31c37993..ce1fd8579 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java +++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataDumpers; import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.FacadeDataSource; @@ -271,7 +272,7 @@ public boolean hasNext() { public IDataSourceDumper next() { int idx; final String jdbcUrl = jdbcUrls.get(idx = index.getAndIncrement()); - if (StringUtils.isEmpty(jdbcUrl)) { + if (jdbcUrl == null) { throw new IllegalStateException("jdbcUrl can not be empty,jdbcUrls.size:" + length + ",applyIdx:" + idx + ",jdbcUrls:" + jdbcUrls.stream().map((url) -> "[" + url + "]").collect(Collectors.joining(","))); diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java index ca46fc89e..6c9c38e24 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java @@ -61,6 +61,8 @@ import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; +import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; @@ -236,7 +238,7 @@ public JDBCConnection getConnection() { Hiveserver2DataSourceFactory dsFactory = getDataSourceFactory(); String jdbcUrl = dsFactory.getJdbcUrl(); try { - return dsFactory.getConnection(jdbcUrl, false); + return dsFactory.getConnection((jdbcUrl), false); } catch (SQLException e) { throw new RuntimeException(jdbcUrl, e); } diff --git a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java index 7eded9860..5915f230d 100644 --- a/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java +++ b/tis-datax/tis-hive-plugin/src/main/java/com/qlangtech/tis/hive/Hiveserver2DataSourceFactory.java @@ -31,6 +31,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.JdbcUrlBuilder; @@ -131,7 +132,7 @@ public UserToken getUserToken() { @Override public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { - return getConnection(jdbcUrl, false, verify); + return getConnection((jdbcUrl), false, verify); } @Override @@ -158,7 +159,7 @@ public DBConfig getDbConfig() { @Override public void visitFirstConnection(IConnProcessor connProcessor) { final String hiveJdbcUrl = createHiveJdbcUrl(); - try (JDBCConnection conn = this.getConnection(hiveJdbcUrl, false)) { + try (JDBCConnection conn = this.getConnection((hiveJdbcUrl), false)) { connProcessor.vist(conn); } catch (Exception e) { throw new RuntimeException(e); diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java index 7766040ca..247ecbc99 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/sink/TISJdbcOutputFormat.java @@ -20,6 +20,7 @@ import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat; import com.dtstack.chunjun.connector.jdbc.sink.SinkColMetas; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils; @@ -53,7 +54,7 @@ protected final void initializeRowConverter() { @Override protected Connection getConnection() throws SQLException { DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null"); - JDBCConnection conn = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false); + JDBCConnection conn = dsFactory.getConnection((this.jdbcConf.getJdbcUrl()), false); return conn.getConnection(); } } diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index b8c8e8be8..c50f4353b 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -68,6 +68,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; @@ -365,7 +366,7 @@ private void setUniqueKeyParams(List uniqueKey, Map para * @see JdbcSinkFactory */ private CreateChunjunSinkFunctionResult createSinkFunction( - String dbName, final String targetTabName, SelectedTab tab, String jdbcUrl + String dbName, final String targetTabName, SelectedTab tab, String jdbcUrl , BasicDataSourceFactory dsFactory, BasicDataXRdbmsWriter dataXWriter) { diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java index 0e039407f..7410a8426 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java @@ -49,6 +49,7 @@ import com.qlangtech.tis.plugin.datax.doris.DorisSelectedTab; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.CMeta; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory; import com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends; diff --git a/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/sink/TISMysqlOutputFormat.java b/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/sink/TISMysqlOutputFormat.java index e0d7980b4..f6c63e27e 100644 --- a/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/sink/TISMysqlOutputFormat.java +++ b/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/sink/TISMysqlOutputFormat.java @@ -20,6 +20,8 @@ import com.dtstack.chunjun.connector.jdbc.sink.SinkColMetas; import com.dtstack.chunjun.connector.mysql.sink.MysqlOutputFormat; +import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils; @@ -46,7 +48,7 @@ public TISMysqlOutputFormat(DataSourceFactory dsFactory, SinkColMetas cols) { @Override protected Connection getConnection() throws SQLException { DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null"); - return dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false).getConnection(); + return dsFactory.getConnection((this.jdbcConf.getJdbcUrl()), false).getConnection(); } @Override diff --git a/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/source/TISMysqlInputFormat.java b/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/source/TISMysqlInputFormat.java index 4cc1b2eb1..ae709a2a8 100644 --- a/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/source/TISMysqlInputFormat.java +++ b/tis-incr/tis-flink-chunjun-mysql-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/source/TISMysqlInputFormat.java @@ -23,6 +23,8 @@ import com.dtstack.chunjun.connector.mysql.source.MysqlInputFormat; import com.dtstack.chunjun.converter.IDeserializationConverter; import com.dtstack.chunjun.element.column.BigDecimalColumn; +import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils; @@ -52,7 +54,7 @@ public TISMysqlInputFormat(DataSourceFactory dataSourceFactory, List> asyncMsg , IDataxProcessor dataXProcessor) throws Exception { - StreamExecutionEnvironment env = getFlinkExecutionEnvironment(); - if (CollectionUtils.isEmpty(asyncMsg.getFocusTabs())) { - throw new IllegalArgumentException("focusTabs can not be empty"); - } + try (DefaultJDBCConnectionPool connectionPool = new DefaultJDBCConnectionPool()) { + JDBCConnection.connectionPool.set(connectionPool); + StreamExecutionEnvironment env = getFlinkExecutionEnvironment(); + + if (CollectionUtils.isEmpty(asyncMsg.getFocusTabs())) { + throw new IllegalArgumentException("focusTabs can not be empty"); + } - Tab2OutputTag tab2OutputTag = createTab2OutputTag(asyncMsg, env, dataxName); - Map> sinks = createTabSinkFunc(dataXProcessor); - // CountDownLatch countDown = new CountDownLatch(1); + Tab2OutputTag tab2OutputTag = createTab2OutputTag(asyncMsg, env, dataxName); + Map> sinks = createTabSinkFunc(dataXProcessor); + // CountDownLatch countDown = new CountDownLatch(1); - this.processTableStream(env, tab2OutputTag, new SinkFuncs(sinks)); - return executeFlinkJob(dataxName, env); + this.processTableStream(env, tab2OutputTag, new SinkFuncs(sinks)); + return executeFlinkJob(dataxName, env); + } finally { + JDBCConnection.connectionPool.remove(); + } } protected Map> createTabSinkFunc( diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/DefaultJDBCConnectionPool.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/DefaultJDBCConnectionPool.java new file mode 100644 index 000000000..0fbe7c1bb --- /dev/null +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/DefaultJDBCConnectionPool.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.realtime; + + +import com.qlangtech.tis.plugin.ds.JDBCConnection; +import com.qlangtech.tis.plugin.ds.JDBCConnectionPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-10-05 22:14 + **/ +public class DefaultJDBCConnectionPool extends JDBCConnectionPool implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(DefaultJDBCConnectionPool.class); + + private final Map connectionCache = new HashMap<>(); + + public DefaultJDBCConnectionPool() { + } + + @Override + public JDBCConnection getConnection(String jdbcUrl, boolean verify) { + return connectionCache.get(jdbcUrl); + } + + @Override + public JDBCConnection setConnection(String jdbcUrl, boolean verify, JDBCConnection conn) { + JDBCConnection newConn = new JDBCConnection(conn.getConnection(), conn.getUrl()) { + @Override + public void close() throws SQLException { + + } + }; + connectionCache.put(jdbcUrl, newConn); + return newConn; + } + + @Override + public void close() throws Exception { + for (Map.Entry entry : connectionCache.entrySet()) { + try { + entry.getValue().getConnection().close(); + } catch (SQLException e) { + logger.warn(e.getMessage(), e); + } + } + } +} diff --git a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/coresource/DefaultPowerjobCoreDataSource.java b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/coresource/DefaultPowerjobCoreDataSource.java index ee7c89808..cd564b81a 100644 --- a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/coresource/DefaultPowerjobCoreDataSource.java +++ b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/coresource/DefaultPowerjobCoreDataSource.java @@ -17,6 +17,7 @@ import com.qlangtech.tis.plugin.datax.powerjob.K8SDataXPowerJobServer; import com.qlangtech.tis.plugin.datax.powerjob.PowerjobCoreDataSource; import com.qlangtech.tis.plugin.ds.DBConfig; + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.IDBAuthorizeTokenGetter; @@ -128,7 +129,7 @@ protected String getJdbcUrl(K8sImage image) { try { dbConfig.vistDbName(new DBConfig.IProcess() { @Override - public boolean visit(DBConfig config, String jdbcUrl, String ip, String dbName) throws Exception { + public boolean visit(DBConfig config, String jdbcUrl, String ip, String dbName) throws Exception { jdbcUrlRef.set(jdbcUrl); return true; }