diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java index a9e0e1dd1..71ac1528f 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsWriter.java @@ -36,7 +36,7 @@ import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.ColumnMetaData; - + import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; @@ -147,9 +147,9 @@ protected Class getExpectDescClass() { @Override public final void initWriterTable(String targetTabName, List jdbcUrls) throws Exception { - if (RobustReflectionConverter2.usedPluginInfo.get().isDryRun()) { - return; - } +// if (RobustReflectionConverter2.usedPluginInfo.get().isDryRun()) { +// return; +// } process(this.dataXName, (BasicDataXRdbmsWriter) this, targetTabName, jdbcUrls); } @@ -179,7 +179,7 @@ private static void process(String dataXName, BasicDataXRdbmsWriter cols = Lists.newArrayList(); + jdbcConn.initializeSinkTab(tableName, () -> { try { - cols = dsFactory.getTableMetadata(jdbcConn, true, tab); - tableExist = true; - } catch (TableNotFoundException e) { - logger.warn(e.toString()); - } + File createDDL = new File(processor.getDataxCreateDDLDir(null) + , tableName + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX); + if (!createDDL.exists()) { + throw new IllegalStateException("create table script is not exist:" + createDDL.getAbsolutePath()); + } + Connection conn = jdbcConn.getConnection(); + DataSourceFactory dsFactory = dsGetter.getDataSourceFactory(); + String createScript = FileUtils.readFileToString(createDDL, TisUTF8.get()); + final EntityName tab = EntityName.parse(tableName); - if (!tableExist) { - // 表不存在 - boolean success = false; + boolean tableExist = false; + List cols = Lists.newArrayList(); try { - try (Statement statement = conn.createStatement()) { - logger.info("create table:{}\n script:{}", tab.getFullName(), createScript); - success = statement.execute(createScript); + cols = dsFactory.getTableMetadata(jdbcConn, true, tab); + tableExist = true; + } catch (TableNotFoundException e) { + logger.warn(e.toString()); + } + + if (!tableExist) { + // 表不存在 + boolean success = false; + try { + try (Statement statement = conn.createStatement()) { + logger.info("create table:{}\n script:{}", tab.getFullName(), createScript); + success = statement.execute(createScript); + } + } catch (SQLException e) { + throw new RuntimeException(createScript, e); } - } catch (SQLException e) { - throw new RuntimeException(createScript, e); + } else { + logger.info("table:{},cols:{} already exist ,skip the create table step", tab.getFullName() + , cols.stream().map((col) -> col.getName()).collect(Collectors.joining(","))); } - } else { - logger.info("table:{},cols:{} already exist ,skip the create table step", tab.getFullName() - , cols.stream().map((col) -> col.getName()).collect(Collectors.joining(","))); + // return tableExist; + } catch (IOException e) { + throw new RuntimeException(e); } - return tableExist; - } - } catch (IOException e) { - throw new RuntimeException(e); + + + }); + } - return false; +// } catch (IOException e) { +// throw new RuntimeException(e); +// } + // return false; } diff --git a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/reader/BasicRDBMSDataXReaderTest.java b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/reader/BasicRDBMSDataXReaderTest.java index 57c5dd230..31656dad6 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/reader/BasicRDBMSDataXReaderTest.java +++ b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/reader/BasicRDBMSDataXReaderTest.java @@ -5,6 +5,7 @@ import com.google.common.collect.Lists; import com.qlangtech.plugins.incr.flink.cdc.TestSelectedTab; import com.qlangtech.tis.TIS; +import com.qlangtech.tis.datax.DataXCfgFile; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.manage.common.TisUTF8; @@ -28,6 +29,7 @@ import org.junit.runners.Parameterized; import java.io.File; +import java.util.Optional; /** * @author 百岁 (baisui@qlangtech.com) @@ -78,7 +80,7 @@ public DataSourceFactory getPlugin() { final File ddlDir = folder.newFolder("ddlDir"); EasyMock.expect(processor.getDataxCreateDDLDir(null)).andReturn(ddlDir); AbstractCreateTableSqlBuilder.CreateDDL createDDL - = this.dataXWriter.generateCreateDDL(new IDataxProcessor.TableMap(tab)); + = this.dataXWriter.generateCreateDDL(new IDataxProcessor.TableMap(tab), Optional.empty()); FileUtils.write(new File(ddlDir , tab.getName() + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX), createDDL.getDDLScript()); diff --git a/tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/DolphinSchedulerEndpoint.java b/tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/DolphinSchedulerEndpoint.java index f14498746..175a01026 100644 --- a/tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/DolphinSchedulerEndpoint.java +++ b/tis-datax/tis-datax-dolphinscheduler-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doplinscheduler/export/DolphinSchedulerEndpoint.java @@ -77,7 +77,7 @@ public DolphinSchedulerEndpoint createConfigInstance() { private void tryConnect(IControlMsgHandler msgHandler, Context context) { //http://192.168.28.201:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn#/Worker%E5%88%86%E7%BB%84%E7%AE%A1%E7%90%86/queryAllWorkerGroupsPaging DolphinSchedulerResponse response = createSchedulerURLBuilder().appendSubPath("worker-groups") - // .appendQueryParam(FIELD_PAGE_NO, 1).appendQueryParam(FIELD_PAGE_SIZE, 100) + // .appendQueryParam(FIELD_PAGE_NO, 1).appendQueryParam(FIELD_PAGE_SIZE, 100) .appendPageParam() .applyGet(Optional.of(new StreamErrorProcess() { @Override @@ -89,7 +89,9 @@ public void error(int status, InputStream errstream, IOException e) throws Excep } } })); - + if (context.hasErrors()) { + return; + } if (!response.isSuccess()) { throw TisException.create(response.getMessage()); } diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index 72eb19e20..13bb46fa1 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -601,7 +601,7 @@ public IStreamTableMeta load(String tableName) throws Exception { final BasicDataSourceFactory ds = (BasicDataSourceFactory) writer.getDataSourceFactory(); // 初始化表RDBMS的表,如果表不存在就创建表 - // DataxWriter.process(dataXName, tableName, ds.getJdbcUrls()); + DataxWriter.process(dataXName, tableName, ds.getJdbcUrls()); final List colsMeta = ds.getTableMetadata(true, EntityName.parse(tableName)) .stream().map((c) -> new HdfsColMeta(c.getName(), c.isNullable(), c.isPk(), c.getType())) .collect(Collectors.toList());