diff --git a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/BasicDorisWriter.java b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/BasicDorisWriter.java index a9c91b516..ce4f4b8b1 100644 --- a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/BasicDorisWriter.java +++ b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/BasicDorisWriter.java @@ -31,6 +31,7 @@ import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.DataTypeMeta; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; @@ -212,7 +213,6 @@ protected DorisType convertType(CMeta col) { } - public static class DorisType implements Serializable { public final DataType type; final String token; @@ -269,8 +269,9 @@ public DorisType varcharType(DataType type) { // 原因:varchar(n) 再mysql中的n是字符数量,doris中的字节数量,所以如果在mysql中是varchar(n)在doris中varchar(3*N) // 三倍,doris中是按照utf-8字节数计算的 int colSize = type.getColumnSize(); - if (colSize < 1) { - colSize = 1000; + if (colSize < 1 || colSize > 1500) { + // https://doris.apache.org/docs/1.2/sql-manual/sql-reference/Data-Types/STRING/ + return new DorisType(type, "STRING"); } return new DorisType(type, "VARCHAR(" + Math.min(colSize * 3, 65000) + ")"); } @@ -288,8 +289,13 @@ public DorisType floatType(DataType type) { @Override public final DorisType decimalType(DataType type) { // doris or starRocks precision 不能超过超过半27 + int precision = type.getColumnSize(); + if (precision < 1 || precision > DataTypeMeta.DEFAULT_DECIMAL_PRECISION) { + precision = DataTypeMeta.DEFAULT_DECIMAL_PRECISION; + } + return new DorisType(type, - getDecimalToken() + "(" + Math.min(type.getColumnSize(), 27) + "," + (type.getDecimalDigits() != null ? + getDecimalToken() + "(" + precision + "," + (type.getDecimalDigits() != null ? type.getDecimalDigits() : 0) + ")"); } diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java index e6456f106..f62d0ebda 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java +++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DataDumpers; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.DataTypeMeta; import com.qlangtech.tis.plugin.ds.IDataSourceDumper; import com.qlangtech.tis.plugin.ds.TISTable; import com.qlangtech.tis.plugin.ds.TableNotFoundException; @@ -243,7 +244,7 @@ private String convertType(CMeta col) { if (type.getColumnSize() > 0) { return "DECIMAL(" + type.getColumnSize() + "," + type.getDecimalDigits() + ")"; } else { - return "DECIMAL"; + return "DECIMAL(" + DataTypeMeta.DEFAULT_DECIMAL_PRECISION + ",0)"; } } case DATE: diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index b15a61db2..81fcd63bc 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -100,6 +100,7 @@ import java.util.stream.Collectors; /** + * * WRITER extends BasicDataXRdbmsWriter, DS extends BasicDataSourceFactory * * @author: 百岁(baisui@qlangtech.com) diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java index 845ee232a..2911932e1 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.java @@ -18,6 +18,7 @@ package com.qlangtech.plugins.incr.flink.chunjun.doris.sink; +import com.alibaba.citrus.turbine.Context; import com.alibaba.fastjson.JSONObject; import com.dtstack.chunjun.conf.OperatorConf; import com.dtstack.chunjun.conf.SyncConf; @@ -37,6 +38,9 @@ import com.qlangtech.tis.extension.Descriptor; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.plugin.IEndTypeGetter; +import com.qlangtech.tis.plugin.annotation.FormField; +import com.qlangtech.tis.plugin.annotation.FormFieldType; +import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.SelectedTab; import com.qlangtech.tis.plugin.datax.SelectedTabExtend; import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; @@ -48,6 +52,7 @@ import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory; import com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends; import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory; +import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import org.apache.commons.collections.CollectionUtils; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.streaming.api.datastream.DataStream; @@ -60,12 +65,29 @@ import java.util.Map; import java.util.Properties; +//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; +//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT; + /** + * https://dtstack.github.io/chunjun/documents/zh/ChunJun%E8%BF%9E%E6%8E%A5%E5%99%A8@doris@dorisbatch-sink?lang=zh + * * @author: 百岁(baisui@qlangtech.com) * @create: 2022-08-15 14:32 **/ public class ChunjunDorisSinkFactory extends ChunjunSinkFactory { + + @FormField(ordinal = 13, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer}) + public Integer connectTimeout;// = loadConf.getRequestConnectTimeoutMs() == null ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : loadConf.getRequestConnectTimeoutMs(); + @FormField(ordinal = 14, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer}) + public Integer socketTimeout;// = loadConf.getRequestReadTimeoutMs() == null ? DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : loadConf.getRequestReadTimeoutMs(); + @FormField(ordinal = 15, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer}) + public Integer retries;// = loadConf.getRequestRetries() == null ? DORIS_REQUEST_RETRIES_DEFAULT : loadConf.getRequestRetries(); + + // + + @Override protected Class getJdbcDialectClass() { throw new UnsupportedOperationException(); @@ -109,6 +131,14 @@ protected void setParameter(BasicDataSourceFactory dsFactory params.put(DorisKeys.DATABASE_KEY, dsFactory.dbName); params.put(DorisKeys.TABLE_KEY, targetTabName); + // params.put() + +// int connectTimeout = loadConf.getRequestConnectTimeoutMs() == null ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : loadConf.getRequestConnectTimeoutMs(); +// int socketTimeout = loadConf.getRequestReadTimeoutMs() == null ? DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : loadConf.getRequestReadTimeoutMs(); +// int retries = loadConf.getRequestRetries() == null ? DORIS_REQUEST_RETRIES_DEFAULT : loadConf.getRequestRetries(); + params.put(DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY, this.connectTimeout); + params.put(DorisKeys.REQUEST_READ_TIMEOUT_MS_KEY, this.socketTimeout); + params.put(DorisKeys.REQUEST_RETRIES_KEY, this.retries); DorisSelectedTab dorisTab = (DorisSelectedTab) tab; if (dorisTab.seqKey.isOn()) { @@ -217,6 +247,32 @@ protected IEndTypeGetter.EndType getTargetType() { return IEndTypeGetter.EndType.Doris; } + public boolean validateConnectTimeout(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { + + if (Integer.parseInt(value) < 1000) { + msgHandler.addFieldError(context, fieldName, "不能小于1秒"); + return false; + } + + return true; + } + + public boolean validateSocketTimeout(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { + if (Integer.parseInt(value) < 1000) { + msgHandler.addFieldError(context, fieldName, "不能小于1秒"); + return false; + } + return true; + } + + public boolean validateRetries(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) { + if (Integer.parseInt(value) < 1) { + msgHandler.addFieldError(context, fieldName, "不能小于1次"); + return false; + } + return true; + } + @Override public Descriptor getSelectedTableExtendDescriptor() { // return TIS.get().getDescriptor(DorisTabProps.class); diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisCommon.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisCommon.java new file mode 100644 index 000000000..84b5ca7b0 --- /dev/null +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisCommon.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink; + +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; +import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-02-26 08:52 + **/ +public class ChunjunDorisCommon { + public static Integer dftConnectTimeout() { + return DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; + } + public static Integer dftSocketTimeout() { + return DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; + } + public static Integer dftRetries() { + return DORIS_REQUEST_RETRIES_DEFAULT; + } +} diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.json b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.json new file mode 100644 index 000000000..ebab68f7f --- /dev/null +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/main/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/ChunjunDorisSinkFactory.json @@ -0,0 +1,14 @@ +{ + "connectTimeout": { + "dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftConnectTimeout()", + "help": "和服务端建立连接的超时时间,单位:ms" + }, + "socketTimeout": { + "dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftSocketTimeout()", + "help": "从服务端读取数据的超时时间,单位:ms" + }, + "retries": { + "dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftRetries()", + "help": "从服务端读取数据失败最大重试次数" + } +} diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/TestAll.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/TestAll.java index a600c42bb..56f18d729 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/TestAll.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/TestAll.java @@ -17,6 +17,7 @@ */ import com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TestChunjunDorisSinkFactory; +import com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TestChunjunDorisSinkFactoryWithoutDocker; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -25,6 +26,6 @@ * @create: 2022-09-02 10:31 **/ @RunWith(Suite.class) -@Suite.SuiteClasses({TestChunjunDorisSinkFactory.class}) +@Suite.SuiteClasses({TestChunjunDorisSinkFactory.class, TestChunjunDorisSinkFactoryWithoutDocker.class}) public class TestAll { } diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java index f8943a50e..b5a58180f 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.coredefine.module.action.TargetResName; import com.qlangtech.tis.datax.IStreamTableMeataCreator; import com.qlangtech.tis.plugin.IEndTypeGetter; +import com.qlangtech.tis.plugin.common.PluginDesc; import com.qlangtech.tis.plugin.datax.SelectedTab; import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; import com.qlangtech.tis.plugin.datax.doris.DataXDorisWriter; @@ -202,6 +203,7 @@ protected BasicDataSourceFactory getDsFactory() { // } + /** * https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual * https://github.com/apache/doris/blob/1b0b5b5f0940f37811fc9bdce8d148766e46f6cb/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactoryWithoutDocker.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactoryWithoutDocker.java new file mode 100644 index 000000000..5488fc6ae --- /dev/null +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactoryWithoutDocker.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.plugins.incr.flink.chunjun.doris.sink; + +import com.qlangtech.tis.plugin.common.PluginDesc; +import org.junit.Test; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-02-26 08:50 + **/ +public class TestChunjunDorisSinkFactoryWithoutDocker { + @Test + public void testDescJsonGenerate(){ + PluginDesc.testDescGenerate(ChunjunDorisSinkFactory.class, "chunjun-doris-sink-descriptor.json"); + } +} diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/chunjun-doris-sink-descriptor.json b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/chunjun-doris-sink-descriptor.json new file mode 100644 index 000000000..3853958a2 --- /dev/null +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/resources/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/chunjun-doris-sink-descriptor.json @@ -0,0 +1,170 @@ +{ + "com.qlangtech.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisSinkFactory":{ + "attrs":[ + { + "describable":false, + "eprops":{ + "asyncHelp":true, + "dftVal":"at-least-once", + "enum":[ + { + "label":"Exactly-Once", + "val":"exactly-once" + }, + { + "label":"At-Least-Once", + "val":"at-least-once" + } + ] + }, + "key":"semantic", + "ord":1, + "pk":false, + "required":true, + "type":5 + }, + { + "describable":true, + "descriptors":{ + "com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunSqlType":{ + "attrs":[ + + ], + "containAdvance":false, + "displayName":"SQL", + "extendPoint":"com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunStreamScriptType", + "extractProps":{ + "notebook":{ + "ability":false, + "activate":false + } + }, + "impl":"com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunSqlType", + "implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechtispluginsincrflinkchunjunscriptchunjunsqltype", + "veriflable":false + }, + "com.qlangtech.tis.plugins.incr.flink.chunjun.script.StreamApiScript":{ + "attrs":[ + + ], + "containAdvance":false, + "displayName":"StreamAPI", + "extendPoint":"com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunStreamScriptType", + "extractProps":{ + "notebook":{ + "ability":false, + "activate":false + } + }, + "impl":"com.qlangtech.tis.plugins.incr.flink.chunjun.script.StreamApiScript", + "implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechtispluginsincrflinkchunjunscriptstreamapiscript", + "veriflable":false + } + }, + "eprops":{ + "asyncHelp":true, + "dftVal":"StreamAPI", + "label":"脚本类型" + }, + "extendPoint":"com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunStreamScriptType", + "extensible":false, + "key":"scriptType", + "ord":4, + "pk":false, + "required":true, + "type":1 + }, + { + "describable":false, + "eprops":{ + "asyncHelp":true, + "dftVal":5000 + }, + "key":"batchSize", + "ord":6, + "pk":false, + "required":true, + "type":4 + }, + { + "describable":false, + "eprops":{ + "dftVal":10000, + "help":"\"the flush interval mills, over this time, asynchronous threads will flush data. The default value is 10s." + }, + "key":"flushIntervalMills", + "ord":9, + "pk":false, + "required":true, + "type":4 + }, + { + "describable":false, + "eprops":{ + "dftVal":1, + "help":"sink 并行度" + }, + "key":"parallelism", + "ord":12, + "pk":false, + "required":true, + "type":4 + }, + { + "advance":true, + "describable":false, + "eprops":{ + "dftVal":30000, + "help":"和服务端建立连接的超时时间,单位:ms" + }, + "key":"connectTimeout", + "ord":13, + "pk":false, + "required":true, + "type":4 + }, + { + "advance":true, + "describable":false, + "eprops":{ + "dftVal":30000, + "help":"从服务端读取数据的超时时间,单位:ms" + }, + "key":"socketTimeout", + "ord":14, + "pk":false, + "required":true, + "type":4 + }, + { + "advance":true, + "describable":false, + "eprops":{ + "dftVal":3, + "help":"从服务端读取数据失败最大重试次数" + }, + "key":"retries", + "ord":15, + "pk":false, + "required":true, + "type":4 + } + ], + "containAdvance":true, + "displayName":"Chunjun-Sink-Doris", + "extendPoint":"com.qlangtech.tis.plugin.incr.TISSinkFactory", + "extractProps":{ + "endType":"doris", + "extendSelectedTabProp":false, + "notebook":{ + "ability":false, + "activate":false + }, + "supportIcon":true, + "targetType":"doris" + }, + "impl":"com.qlangtech.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisSinkFactory", + "implUrl":"http://tis.pub/docs/plugin/plugins/#comqlangtechpluginsincrflinkchunjundorissinkchunjundorissinkfactory", + "veriflable":false + } +} diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java index ad766b389..a87bfb0e3 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java @@ -24,6 +24,7 @@ import com.qlangtech.tis.coredefine.module.action.TargetResName; import com.qlangtech.tis.datax.IStreamTableMeataCreator; import com.qlangtech.tis.plugin.ds.DataType; +import com.qlangtech.tis.plugin.ds.DataTypeMeta; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugins.incr.flink.FlinkColMapper; import com.qlangtech.tis.realtime.BasicFlinkSourceHandle; @@ -203,10 +204,11 @@ public FlinkCol bigInt(DataType type) { public FlinkCol decimalType(DataType type) { + int precision = type.getColumnSize(); Integer scale = type.getDecimalDigits(); - if (precision < 1 || precision > 38) { - precision = 38; + if (precision < 1 || precision > DataTypeMeta.DEFAULT_DECIMAL_PRECISION) { + precision = DataTypeMeta.DEFAULT_DECIMAL_PRECISION; } try { return new FlinkCol(meta, type, //DataTypes.DECIMAL(precision, scale)