Skip to content

Commit

Permalink
add jdbc connection poll support for dataSource datavane/tis#366
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 5, 2024
1 parent a5f114d commit 3febff2
Show file tree
Hide file tree
Showing 48 changed files with 151 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public String[] getHosts() {


@Override
public JDBCConnection getConnection(String jdbcUrl,boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl,boolean verify) throws SQLException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.TableInDB;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
Expand Down Expand Up @@ -141,7 +142,7 @@ public final String getJdbcUrl() {
// }

@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {

try {
Class.forName(JDBC_DRIVER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected HiveTask(IDataSourceFactoryGetter dsFactoryGetter, ISqlTask nodeMeta,
* @return
* @throws Exception
*/
public static boolean isTableExists(DataSourceMeta ds, DataSourceMeta.JDBCConnection connection, EntityName dumpTable) throws Exception {
public static boolean isTableExists(DataSourceMeta ds, JDBCConnection connection, EntityName dumpTable) throws Exception {
// 判断表是否存在
// if (!isDBExists(mrEngine, connection, dumpTable.getDbName())) {
// // DB都不存在,table肯定就不存在啦
Expand Down Expand Up @@ -127,7 +127,7 @@ public static boolean isTableExists(DataSourceMeta ds, DataSourceMeta.JDBCConnec
// return contain;
}

// public static boolean isDBExists(MREngine mrEngine, DataSourceMeta.JDBCConnection connection, String dbName) throws Exception {
// public static boolean isDBExists(MREngine mrEngine, JDBCConnection connection, String dbName) throws Exception {
// AtomicBoolean dbExist = new AtomicBoolean(false);
// connection.query(mrEngine.showDBSQL, result -> {
// if (StringUtils.equals(result.getString(1), dbName)) {
Expand Down Expand Up @@ -200,7 +200,7 @@ protected final void executeTask(String taskname) {
String insertSql = rewriteSql.convert2InsertIntoSQL(this.dsFactoryGetter.getDataSourceFactory(), this.nodeMeta.getExportName());

this.validateDependenciesNode(taskname);
final DataSourceMeta.JDBCConnection conn = this.getTaskContextObj();
final JDBCConnection conn = this.getTaskContextObj();
DataSourceFactory dsFactory = dsFactoryGetter.getDataSourceFactory();

//final String newCreatePt = primaryTable.getTabPartition();
Expand Down Expand Up @@ -245,7 +245,7 @@ protected final void executeTask(String taskname) {
*/
private void processJoinTask(ISqlTask.RewriteSql sql) {
try {
final DataSourceMeta.JDBCConnection conn = this.getTaskContextObj();
final JDBCConnection conn = this.getTaskContextObj();
final ColsParser insertParser = new ColsParser(sql, conn);

DataSourceFactory dsFactory = this.dsFactoryGetter.getDataSourceFactory();
Expand All @@ -265,11 +265,11 @@ private void processJoinTask(ISqlTask.RewriteSql sql) {
* @param partitionRetainNum 保留多少个分区
* @throws Exception
*/
protected abstract void initializeTable(DataSourceMeta mrEngine, ColsParser insertParser, DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception;
protected abstract void initializeTable(DataSourceMeta mrEngine, ColsParser insertParser, JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception;

protected abstract void executeSql(String sql, DataSourceMeta.JDBCConnection conn) throws SQLException;
protected abstract void executeSql(String sql, JDBCConnection conn) throws SQLException;

protected abstract List<String> getHistoryPts(DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, final EntityName table) throws Exception;
protected abstract List<String> getHistoryPts(DataSourceMeta mrEngine, JDBCConnection conn, final EntityName table) throws Exception;

protected void validateDependenciesNode(String taskname) {
Boolean dependencyWorkStatus = null;
Expand Down Expand Up @@ -356,10 +356,10 @@ private Object getValue(String columnLabel) throws SQLException {

protected class ColsParser {
private final ISqlTask.RewriteSql sql;
private final DataSourceMeta.JDBCConnection conn;
private final JDBCConnection conn;
private AbstractInsertFromSelectParser sqlParser;

public ColsParser(ISqlTask.RewriteSql sql, DataSourceMeta.JDBCConnection conn) {
public ColsParser(ISqlTask.RewriteSql sql, JDBCConnection conn) {
this.sql = sql;
this.conn = conn;
}
Expand Down Expand Up @@ -391,7 +391,7 @@ public List<HiveColumn> getColsExcludePartitionCols() {
}
}

private AbstractInsertFromSelectParser getSQLParserResult(String sql, DataSourceMeta.JDBCConnection conn) {
private AbstractInsertFromSelectParser getSQLParserResult(String sql, JDBCConnection conn) {
DataSourceFactory dsFactory = this.dsFactoryGetter.getDataSourceFactory();
Function<ISqlTask.RewriteSql, List<ColumnMetaData>> sqlColMetaGetter = (rewriteSql) -> {
List<ColMeta> cols = rewriteSql.getCols();
Expand Down Expand Up @@ -423,7 +423,7 @@ public ColumnMetaData create(String colName, int index) throws SQLException {
}

// private AbstractInsertFromSelectParser getSQLParserResult(
// String sql, DataSourceFactory dsFactory, DataSourceMeta.JDBCConnection conn, AbstractInsertFromSelectParser insertParser) {
// String sql, DataSourceFactory dsFactory, JDBCConnection conn, AbstractInsertFromSelectParser insertParser) {
//
// TabPartitions tabPartition = new TabPartitions(Collections.emptyMap()) {
// @Override
Expand Down Expand Up @@ -454,7 +454,7 @@ public interface IHistoryTableProcessor {
* @param dumpTable
* @throws Exception
*/
public static void initializeTable(DataSourceMeta ds, DataSourceMeta.JDBCConnection conn
public static void initializeTable(DataSourceMeta ds, JDBCConnection conn
, EntityName dumpTable, IHistoryTableProcessor historyTableProcessor, Supplier<Boolean> tableSameJudgement, Runnable tableCreator) throws Exception {
// if (partitionRetainNum == null || partitionRetainNum < 1) {
// throw new IllegalArgumentException("illegal param partitionRetainNum ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
import com.qlangtech.tis.plugin.ds.IInitWriterTableExecutor;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.PostedDSProp;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
Expand Down Expand Up @@ -162,7 +163,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter<BasicDataSou
IDataxProcessor processor = DataxProcessor.load(null, StoreResourceType.DataApp, dataXName);
DataSourceFactory dsFactory = dataXWriter.getDataSourceFactory();
for (String jdbcUrl : jdbcUrls) {
try (DataSourceMeta.JDBCConnection conn = dsFactory.getConnection(jdbcUrl, false)) {
try (JDBCConnection conn = dsFactory.getConnection(jdbcUrl, false)) {
process(dataXName, processor, dataXWriter, dataXWriter, conn, tableName);
}
}
Expand All @@ -178,7 +179,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter<BasicDataSou
* @return tableExist 表是否存在
*/
public static boolean process(String dataXName, IDataxProcessor processor
, IDataSourceFactoryGetter dsGetter, IDataxWriter dataXWriter, DataSourceMeta.JDBCConnection jdbcConn
, IDataSourceFactoryGetter dsGetter, IDataxWriter dataXWriter, JDBCConnection jdbcConn
, String tableName) {
if (StringUtils.isEmpty(dataXName)) {
throw new IllegalArgumentException("param dataXName can not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import com.qlangtech.tis.util.Memoizer;
Expand All @@ -42,15 +43,15 @@ public class TableColsMeta extends Memoizer<String, Map<String, ColumnMetaData>>
private final DataSourceFactory datasource;
private final String dbName;

private final DataSourceMeta.JDBCConnection connection;
private final JDBCConnection connection;


public TableColsMeta(DataSourceFactory datasource, String dbName) {
this.datasource = datasource;
this.dbName = dbName;
final DBConfig dbConfig = datasource.getDbConfig();

AtomicReference<DataSourceMeta.JDBCConnection> conn = new AtomicReference<>();
AtomicReference<JDBCConnection> conn = new AtomicReference<>();
try {
dbConfig.vistDbName((config, jdbcUrl, ip, dbname) -> {
conn.set(datasource.getConnection(jdbcUrl, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ protected TableInDB createTableInDB() {
return TableInDB.create(this);
}

@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
return super.getConnection(jdbcUrl, verify);
}
// @Override
// public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
// return super.createConnection(jdbcUrl, verify);
// }


protected String getRefectTablesSql() {
Expand Down Expand Up @@ -309,7 +309,7 @@ private void visitConnection(DBConfig db, String ip, String dbName


JDBCConnectionFactory connectionFactory = (url, verify) -> getConnection(url, verify);

validateConnection(connectionFactory, jdbcUrl, p);
} catch (TisException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
import com.qlangtech.tis.plugin.ds.SplitTableStrategy;
import com.qlangtech.tis.plugin.ds.TISTable;
Expand Down Expand Up @@ -96,7 +97,7 @@ protected EntityName logicTable2PhysicsTable(String jdbcUrl, EntityName table) {
private transient dm.jdbc.driver.DmDriver driver;

@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
if (driver == null) {
driver = new dm.jdbc.driver.DmDriver();
}
Expand Down Expand Up @@ -522,7 +523,8 @@ protected String getDataSourceName() {
}

@Override
protected boolean validateConnection(JDBCConnection conn, BasicDataSourceFactory dsFactory, IControlMsgHandler msgHandler, Context context) throws TisException {
protected boolean validateConnection(JDBCConnection conn, BasicDataSourceFactory dsFactory
, IControlMsgHandler msgHandler, Context context) throws TisException {
final String schema = conn.getSchema();
DaMengDataSourceFactory damengSource = (DaMengDataSourceFactory) dsFactory;
if (!StringUtils.equals(schema, damengSource.dbName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testTempateGenerate() throws Exception {
DaMengDataSourceFactory mysqlDs = TestDaMengDataSourceFactory.createDaMengDataSourceFactory();
// new DaMengDataSourceFactory() {
// @Override
// public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl) throws SQLException {
// public JDBCConnection getConnection(String jdbcUrl) throws SQLException {
// return null;
// }
// };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
Expand Down Expand Up @@ -108,7 +109,7 @@ public String buidJdbcUrl(DBConfig db, String ip, String dbName) {


@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
Properties props = new Properties();
props.put("useSSL", "false");
props.put("user", StringUtils.trimToEmpty(this.userName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.mysql.MySQLDataSourceFactory;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -42,7 +43,7 @@ public class MariaDBDataSourceFactory extends MySQLDataSourceFactory {
private transient org.mariadb.jdbc.Driver driver;

@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
if (driver == null) {
driver = new org.mariadb.jdbc.Driver();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.ISqlTask;
import com.qlangtech.tis.sql.parser.TabPartitions;
Expand Down Expand Up @@ -322,10 +323,10 @@ public boolean isGenerateCreateDDLSwitchOff() {
@Override
public ExecuteResult startTask(ITableBuildTask dumpTask) {

try (DataSourceMeta.JDBCConnection conn = this.getConnection()) {
try (JDBCConnection conn = this.getConnection()) {
return dumpTask.process(new ITaskContext() {
@Override
public DataSourceMeta.JDBCConnection getObj() {
public JDBCConnection getObj() {
return conn;
}
});
Expand All @@ -335,7 +336,7 @@ public DataSourceMeta.JDBCConnection getObj() {

}

public DataSourceMeta.JDBCConnection getConnection() {
public JDBCConnection getConnection() {
OdpsDataSourceFactory dsFactory = getDataSourceFactory();
String jdbcUrl = dsFactory.getJdbcUrl();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.sql.parser.ISqlTask;
import com.qlangtech.tis.sql.parser.er.IPrimaryTabFinder;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
Expand Down Expand Up @@ -67,7 +68,7 @@ public JoinOdpsTask(DataXOdpsWriter odpsWriter, IDataSourceFactoryGetter dsFacto
}

@Override
protected void executeSql(String sql, DataSourceMeta.JDBCConnection conn) throws SQLException {
protected void executeSql(String sql, JDBCConnection conn) throws SQLException {

OdpsDataSourceFactory dsFactory
= (OdpsDataSourceFactory) dsFactoryGetter.getDataSourceFactory();
Expand Down Expand Up @@ -172,7 +173,7 @@ public boolean change(Instance.TaskStatus ts) {

@Override
protected List<String> getHistoryPts(
DataSourceMeta mrEngine, DataSourceMeta.JDBCConnection conn, EntityName table) throws Exception {
DataSourceMeta mrEngine, JDBCConnection conn, EntityName table) throws Exception {
OdpsDataSourceFactory dsFactory = (OdpsDataSourceFactory) mrEngine;
Table tab = dsFactory.getOdpsTable(table, Optional.empty());
PartitionSpec ptSpec = null;
Expand All @@ -189,7 +190,7 @@ protected List<String> getHistoryPts(

@Override
protected void initializeTable(DataSourceMeta ds, ColsParser insertParser
, DataSourceMeta.JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception {
, JDBCConnection conn, EntityName dumpTable, Integer partitionRetainNum) throws Exception {

OdpsDataSourceFactory dsFactory = (OdpsDataSourceFactory) ds;
initializeTable(ds, conn, dumpTable,
Expand Down Expand Up @@ -222,7 +223,7 @@ public void cleanHistoryTable() throws IOException {
* @param conn
*/
private void createTable(OdpsDataSourceFactory dsFactory, EntityName dumpTable
, ColsParser insertParser, DataSourceMeta.JDBCConnection conn) {
, ColsParser insertParser, JDBCConnection conn) {
// ISqlTask.RewriteSql rewriteSql = insertParser.getSql();
// String sql = rewriteSql.rewriteSql;
// try {
Expand Down Expand Up @@ -256,7 +257,7 @@ private void createTable(OdpsDataSourceFactory dsFactory, EntityName dumpTable
}
}

private boolean isTableSame(OdpsDataSourceFactory dsFactory, DataSourceMeta.JDBCConnection conn
private boolean isTableSame(OdpsDataSourceFactory dsFactory, JDBCConnection conn
, ColsParser allCols, EntityName dumpTable) {
TableSchema tabSchema = dsFactory.getTableSchema(dumpTable);
for (HiveColumn col : allCols.getColsExcludePartitionCols()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
Expand Down Expand Up @@ -167,7 +168,7 @@ public String getDbName() {
}

@Override
public DataSourceMeta.JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
try {
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.JDBCConnection;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
Expand Down Expand Up @@ -261,7 +262,7 @@ protected DataType getDataType(String keyName) throws SQLException {


@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
public JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException {
try {
Class.forName("oracle.jdbc.OracleDriver");
} catch (ClassNotFoundException e) {
Expand Down
Loading

0 comments on commit 3febff2

Please sign in to comment.