Skip to content

Commit

Permalink
add cahce for initialize table process in sink side , datavane/tis#366
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 9, 2024
1 parent d5a77b7 commit 207e715
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.ColumnMetaData;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
Expand Down Expand Up @@ -147,9 +147,9 @@ protected Class<RdbmsWriterDescriptor> getExpectDescClass() {

@Override
public final void initWriterTable(String targetTabName, List<String> jdbcUrls) throws Exception {
if (RobustReflectionConverter2.usedPluginInfo.get().isDryRun()) {
return;
}
// if (RobustReflectionConverter2.usedPluginInfo.get().isDryRun()) {
// return;
// }
process(this.dataXName, (BasicDataXRdbmsWriter<BasicDataSourceFactory>) this, targetTabName, jdbcUrls);
}

Expand Down Expand Up @@ -179,58 +179,66 @@ private static void process(String dataXName, BasicDataXRdbmsWriter<BasicDataSou
* @param tableName
* @return tableExist 表是否存在
*/
public static boolean process(String dataXName, IDataxProcessor processor
public static void process(String dataXName, IDataxProcessor processor
, IDataSourceFactoryGetter dsGetter, IDataxWriter dataXWriter, JDBCConnection jdbcConn
, String tableName) {
if (StringUtils.isEmpty(dataXName)) {
throw new IllegalArgumentException("param dataXName can not be null");
}
Objects.requireNonNull(dataXWriter, "dataXWriter can not be null,dataXName:" + dataXName);
boolean autoCreateTable = !dataXWriter.isGenerateCreateDDLSwitchOff();
try {
if (autoCreateTable) {
// try {
if (autoCreateTable) {


File createDDL = new File(processor.getDataxCreateDDLDir(null)
, tableName + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX);
if (!createDDL.exists()) {
throw new IllegalStateException("create table script is not exist:" + createDDL.getAbsolutePath());
}
Connection conn = jdbcConn.getConnection();
DataSourceFactory dsFactory = dsGetter.getDataSourceFactory();
String createScript = FileUtils.readFileToString(createDDL, TisUTF8.get());
final EntityName tab = EntityName.parse(tableName);

boolean tableExist = false;
List<ColumnMetaData> cols = Lists.newArrayList();
jdbcConn.initializeSinkTab(tableName, () -> {
try {
cols = dsFactory.getTableMetadata(jdbcConn, true, tab);
tableExist = true;
} catch (TableNotFoundException e) {
logger.warn(e.toString());
}
File createDDL = new File(processor.getDataxCreateDDLDir(null)
, tableName + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX);
if (!createDDL.exists()) {
throw new IllegalStateException("create table script is not exist:" + createDDL.getAbsolutePath());
}
Connection conn = jdbcConn.getConnection();
DataSourceFactory dsFactory = dsGetter.getDataSourceFactory();
String createScript = FileUtils.readFileToString(createDDL, TisUTF8.get());
final EntityName tab = EntityName.parse(tableName);

if (!tableExist) {
// 表不存在
boolean success = false;
boolean tableExist = false;
List<ColumnMetaData> cols = Lists.newArrayList();
try {
try (Statement statement = conn.createStatement()) {
logger.info("create table:{}\n script:{}", tab.getFullName(), createScript);
success = statement.execute(createScript);
cols = dsFactory.getTableMetadata(jdbcConn, true, tab);
tableExist = true;
} catch (TableNotFoundException e) {
logger.warn(e.toString());
}

if (!tableExist) {
// 表不存在
boolean success = false;
try {
try (Statement statement = conn.createStatement()) {
logger.info("create table:{}\n script:{}", tab.getFullName(), createScript);
success = statement.execute(createScript);
}
} catch (SQLException e) {
throw new RuntimeException(createScript, e);
}
} catch (SQLException e) {
throw new RuntimeException(createScript, e);
} else {
logger.info("table:{},cols:{} already exist ,skip the create table step", tab.getFullName()
, cols.stream().map((col) -> col.getName()).collect(Collectors.joining(",")));
}
} else {
logger.info("table:{},cols:{} already exist ,skip the create table step", tab.getFullName()
, cols.stream().map((col) -> col.getName()).collect(Collectors.joining(",")));
// return tableExist;
} catch (IOException e) {
throw new RuntimeException(e);
}
return tableExist;
}
} catch (IOException e) {
throw new RuntimeException(e);


});

}
return false;
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// return false;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.collect.Lists;
import com.qlangtech.plugins.incr.flink.cdc.TestSelectedTab;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.manage.common.TisUTF8;
Expand All @@ -28,6 +29,7 @@
import org.junit.runners.Parameterized;

import java.io.File;
import java.util.Optional;

/**
* @author 百岁 (baisui@qlangtech.com)
Expand Down Expand Up @@ -78,7 +80,7 @@ public DataSourceFactory getPlugin() {
final File ddlDir = folder.newFolder("ddlDir");
EasyMock.expect(processor.getDataxCreateDDLDir(null)).andReturn(ddlDir);
AbstractCreateTableSqlBuilder.CreateDDL createDDL
= this.dataXWriter.generateCreateDDL(new IDataxProcessor.TableMap(tab));
= this.dataXWriter.generateCreateDDL(new IDataxProcessor.TableMap(tab), Optional.empty());

FileUtils.write(new File(ddlDir
, tab.getName() + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX), createDDL.getDDLScript());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public DolphinSchedulerEndpoint createConfigInstance() {
private void tryConnect(IControlMsgHandler msgHandler, Context context) {
//http://192.168.28.201:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn#/Worker%E5%88%86%E7%BB%84%E7%AE%A1%E7%90%86/queryAllWorkerGroupsPaging
DolphinSchedulerResponse response = createSchedulerURLBuilder().appendSubPath("worker-groups")
// .appendQueryParam(FIELD_PAGE_NO, 1).appendQueryParam(FIELD_PAGE_SIZE, 100)
// .appendQueryParam(FIELD_PAGE_NO, 1).appendQueryParam(FIELD_PAGE_SIZE, 100)
.appendPageParam()
.applyGet(Optional.of(new StreamErrorProcess() {
@Override
Expand All @@ -89,7 +89,9 @@ public void error(int status, InputStream errstream, IOException e) throws Excep
}
}
}));

if (context.hasErrors()) {
return;
}
if (!response.isSuccess()) {
throw TisException.create(response.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public IStreamTableMeta load(String tableName) throws Exception {

final BasicDataSourceFactory ds = (BasicDataSourceFactory) writer.getDataSourceFactory();
// 初始化表RDBMS的表,如果表不存在就创建表
// DataxWriter.process(dataXName, tableName, ds.getJdbcUrls());
DataxWriter.process(dataXName, tableName, ds.getJdbcUrls());
final List<IColMetaGetter> colsMeta = ds.getTableMetadata(true, EntityName.parse(tableName))
.stream().map((c) -> new HdfsColMeta(c.getName(), c.isNullable(), c.isPk(), c.getType()))
.collect(Collectors.toList());
Expand Down

0 comments on commit 207e715

Please sign in to comment.