Skip to content

Commit

Permalink
add connection pool support datavane/tis#366
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 6, 2024
1 parent 3febff2 commit 9b912e7
Show file tree
Hide file tree
Showing 22 changed files with 129 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.*;

import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -85,7 +86,7 @@ public DBConfig getDbConfig() {
public DataDumpers getDataDumpers(TISTable table) {
List<String> jdbcUrls = Lists.newArrayList();
for (String host : this.getHosts()) {
jdbcUrls.add(host);
jdbcUrls.add((host));
}
return DataDumpers.create(jdbcUrls, table);
}
Expand Down Expand Up @@ -217,7 +218,7 @@ public String[] getHosts() {


@Override
public JDBCConnection createConnection(String jdbcUrl,boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;

import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.clickhouse.ClickHouseDataSourceFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.TableInDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
Expand Down Expand Up @@ -163,7 +164,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter<BasicDataSou
IDataxProcessor processor = DataxProcessor.load(null, StoreResourceType.DataApp, dataXName);
DataSourceFactory dsFactory = dataXWriter.getDataSourceFactory();
for (String jdbcUrl : jdbcUrls) {
try (JDBCConnection conn = dsFactory.getConnection(jdbcUrl, false)) {
try (JDBCConnection conn = dsFactory.getConnection((jdbcUrl), false)) {
process(dataXName, processor, dataXWriter, dataXWriter, conn, tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.qlangtech.tis.datax.IDataxProcessor.TabCols;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;

import org.apache.commons.lang.StringUtils;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;

import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import com.qlangtech.tis.util.IPluginContext;
Expand Down Expand Up @@ -143,8 +144,7 @@ public List<ColumnMetaData> getTableMetadata(JDBCConnection conn, boolean inSink
private List<ColumnMetaData> parseTableColMeta(EntityName table, boolean inSink, DBConfig config, String ip, String dbname) throws Exception {
// List<ColumnMetaData> columns = Lists.newArrayList();
String jdbcUrl = buidJdbcUrl(config, ip, dbname);

return parseTableColMeta(inSink, table, jdbcUrl);
return parseTableColMeta(inSink, table, (jdbcUrl));
}


Expand Down Expand Up @@ -306,11 +306,7 @@ private void visitConnection(DBConfig db, String ip, String dbName
//Connection conn = null;
String jdbcUrl = buidJdbcUrl(db, ip, dbName);
try {


JDBCConnectionFactory connectionFactory = (url, verify) -> getConnection(url, verify);

validateConnection(connectionFactory, jdbcUrl, p);
validateConnection((jdbcUrl), p);
} catch (TisException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataDumpers;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataType;
Expand Down Expand Up @@ -280,7 +281,7 @@ public boolean hasNext() {

@Override
public IDataSourceDumper next() {
final String jdbcUrl = jdbcUrls.get(index.getAndIncrement());
final String jdbcUrl = jdbcUrls.get(index.getAndIncrement());
return new MySqlDataSourceDumper(jdbcUrl, table);
}
};
Expand All @@ -303,7 +304,7 @@ public String getDBSchema() {
}

private class MySqlDataSourceDumper implements IDataSourceDumper {
private final String jdbcUrl;
private final String jdbcUrl;
private final TISTable table;

private JDBCConnection connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.datax.plugin.writer.hdfswriter.HdfsColMeta;
import com.google.common.collect.Lists;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.datax.impl.DataxWriter;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void testRealDump() throws Exception {
TestDataXDaMengWriter.setPlaceholderReader();

DataXCfgJson wjson = DataXCfgJson.content(TestDataXDaMengWriter.generateDataXCfg(writer, Optional.of(tabMap)));
CreateTableSqlBuilder.CreateDDL ddl = writer.generateCreateDDL(tabMap);
CreateTableSqlBuilder.CreateDDL ddl = writer.generateCreateDDL(tabMap, Optional.empty());

DataxProcessor dataXProcessor = EasyMock.mock("dataXProcessor", DataxProcessor.class);
File createDDLDir = folder.newFolder();// new File(".");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.qlangtech.tis.plugin.datax.odps.OdpsDataSourceFactory;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;

import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
Expand Down Expand Up @@ -342,7 +343,7 @@ public JDBCConnection getConnection() {
try {
return dsFactory.getConnection(jdbcUrl, false);
} catch (SQLException e) {
throw new RuntimeException(jdbcUrl, e);
throw new RuntimeException(String.valueOf(jdbcUrl), e);
}
}

Expand Down Expand Up @@ -376,7 +377,7 @@ public static class OdpsContext implements IDataxContext {
public OdpsContext(DataXOdpsWriter odpsWriter, IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformerRules) {
this.odpsWriter = odpsWriter;
this.tableMapper = tableMapper;
this.cols = TabCols.create( null, tableMapper, transformerRules).getRawCols();
this.cols = TabCols.create(null, tableMapper, transformerRules).getRawCols();
this.dsFactory = odpsWriter.getDataSourceFactory();
this.accessKey = this.dsFactory.getAccessKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataDumpers;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.FacadeDataSource;
Expand Down Expand Up @@ -271,7 +272,7 @@ public boolean hasNext() {
public IDataSourceDumper next() {
int idx;
final String jdbcUrl = jdbcUrls.get(idx = index.getAndIncrement());
if (StringUtils.isEmpty(jdbcUrl)) {
if (jdbcUrl == null) {
throw new IllegalStateException("jdbcUrl can not be empty,jdbcUrls.size:" + length
+ ",applyIdx:" + idx + ",jdbcUrls:"
+ jdbcUrls.stream().map((url) -> "[" + url + "]").collect(Collectors.joining(",")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
Expand Down Expand Up @@ -236,7 +238,7 @@ public JDBCConnection getConnection() {
Hiveserver2DataSourceFactory dsFactory = getDataSourceFactory();
String jdbcUrl = dsFactory.getJdbcUrl();
try {
return dsFactory.getConnection(jdbcUrl, false);
return dsFactory.getConnection((jdbcUrl), false);
} catch (SQLException e) {
throw new RuntimeException(jdbcUrl, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JdbcUrlBuilder;
Expand Down Expand Up @@ -131,7 +132,7 @@ public UserToken getUserToken() {

@Override
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
return getConnection(jdbcUrl, false, verify);
return getConnection((jdbcUrl), false, verify);
}

@Override
Expand All @@ -158,7 +159,7 @@ public DBConfig getDbConfig() {
@Override
public void visitFirstConnection(IConnProcessor connProcessor) {
final String hiveJdbcUrl = createHiveJdbcUrl();
try (JDBCConnection conn = this.getConnection(hiveJdbcUrl, false)) {
try (JDBCConnection conn = this.getConnection((hiveJdbcUrl), false)) {
connProcessor.vist(conn);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
import com.dtstack.chunjun.connector.jdbc.sink.SinkColMetas;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils;
Expand Down Expand Up @@ -53,7 +54,7 @@ protected final void initializeRowConverter() {
@Override
protected Connection getConnection() throws SQLException {
DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null");
JDBCConnection conn = dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false);
JDBCConnection conn = dsFactory.getConnection((this.jdbcConf.getJdbcUrl()), false);
return conn.getConnection();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
Expand Down Expand Up @@ -365,7 +366,7 @@ private void setUniqueKeyParams(List<String> uniqueKey, Map<String, Object> para
* @see JdbcSinkFactory
*/
private CreateChunjunSinkFunctionResult createSinkFunction(
String dbName, final String targetTabName, SelectedTab tab, String jdbcUrl
String dbName, final String targetTabName, SelectedTab tab, String jdbcUrl
, BasicDataSourceFactory dsFactory, BasicDataXRdbmsWriter dataXWriter) {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.qlangtech.tis.plugin.datax.doris.DorisSelectedTab;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.CMeta;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory;
import com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.dtstack.chunjun.connector.jdbc.sink.SinkColMetas;
import com.dtstack.chunjun.connector.mysql.sink.MysqlOutputFormat;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils;

Expand All @@ -46,7 +48,7 @@ public TISMysqlOutputFormat(DataSourceFactory dsFactory, SinkColMetas cols) {
@Override
protected Connection getConnection() throws SQLException {
DataSourceFactory dsFactory = Objects.requireNonNull(this.dsFactory, "dsFactory can not be null");
return dsFactory.getConnection(this.jdbcConf.getJdbcUrl(), false).getConnection();
return dsFactory.getConnection((this.jdbcConf.getJdbcUrl()), false).getConnection();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.dtstack.chunjun.connector.mysql.source.MysqlInputFormat;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils;
Expand Down Expand Up @@ -52,7 +54,7 @@ public TISMysqlInputFormat(DataSourceFactory dataSourceFactory, List<IColMetaGet
@Override
protected Connection getConnection() throws SQLException {
return Objects.requireNonNull(dataSourceFactory, "dataSourceFactory can not be null")
.getConnection(jdbcConf.getJdbcUrl(), false).getConnection();
.getConnection((jdbcConf.getJdbcUrl()), false).getConnection();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.qlangtech.tis.datax.IStreamTableMeta;
import com.qlangtech.tis.datax.TableAlias;
import com.qlangtech.tis.extension.TISExtensible;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JDBCConnectionPool;
import com.qlangtech.tis.plugin.incr.IncrStreamFactory;
import com.qlangtech.tis.plugin.incr.TISSinkFactory;
import com.qlangtech.tis.realtime.dto.DTOStream;
Expand Down Expand Up @@ -96,18 +98,24 @@ public static IStreamTableMeta getStreamTableMeta(
@Override
public JobExecutionResult consume(TargetResName dataxName, AsyncMsg<List<ReaderSource>> asyncMsg
, IDataxProcessor dataXProcessor) throws Exception {
StreamExecutionEnvironment env = getFlinkExecutionEnvironment();

if (CollectionUtils.isEmpty(asyncMsg.getFocusTabs())) {
throw new IllegalArgumentException("focusTabs can not be empty");
}
try (DefaultJDBCConnectionPool connectionPool = new DefaultJDBCConnectionPool()) {
JDBCConnection.connectionPool.set(connectionPool);
StreamExecutionEnvironment env = getFlinkExecutionEnvironment();

if (CollectionUtils.isEmpty(asyncMsg.getFocusTabs())) {
throw new IllegalArgumentException("focusTabs can not be empty");
}

Tab2OutputTag<DTOStream> tab2OutputTag = createTab2OutputTag(asyncMsg, env, dataxName);
Map<TableAlias, TabSinkFunc<SINK_TRANSFER_OBJ>> sinks = createTabSinkFunc(dataXProcessor);
// CountDownLatch countDown = new CountDownLatch(1);
Tab2OutputTag<DTOStream> tab2OutputTag = createTab2OutputTag(asyncMsg, env, dataxName);
Map<TableAlias, TabSinkFunc<SINK_TRANSFER_OBJ>> sinks = createTabSinkFunc(dataXProcessor);
// CountDownLatch countDown = new CountDownLatch(1);

this.processTableStream(env, tab2OutputTag, new SinkFuncs(sinks));
return executeFlinkJob(dataxName, env);
this.processTableStream(env, tab2OutputTag, new SinkFuncs(sinks));
return executeFlinkJob(dataxName, env);
} finally {
JDBCConnection.connectionPool.remove();
}
}

protected Map<TableAlias, TabSinkFunc<SINK_TRANSFER_OBJ>> createTabSinkFunc(
Expand Down
Loading

0 comments on commit 9b912e7

Please sign in to comment.