Skip to content

Commit

Permalink
add param 'connectTimeout' , 'socketTimeout' , 'retries' datavane/tis…
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Feb 26, 2024
1 parent 7db0240 commit 1807a07
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.DataTypeMeta;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
Expand Down Expand Up @@ -212,7 +213,6 @@ protected DorisType convertType(CMeta col) {
}



public static class DorisType implements Serializable {
public final DataType type;
final String token;
Expand Down Expand Up @@ -269,8 +269,9 @@ public DorisType varcharType(DataType type) {
// 原因:varchar(n) 再mysql中的n是字符数量,doris中的字节数量,所以如果在mysql中是varchar(n)在doris中varchar(3*N)
// 三倍,doris中是按照utf-8字节数计算的
int colSize = type.getColumnSize();
if (colSize < 1) {
colSize = 1000;
if (colSize < 1 || colSize > 1500) {
// https://doris.apache.org/docs/1.2/sql-manual/sql-reference/Data-Types/STRING/
return new DorisType(type, "STRING");
}
return new DorisType(type, "VARCHAR(" + Math.min(colSize * 3, 65000) + ")");
}
Expand All @@ -288,8 +289,13 @@ public DorisType floatType(DataType type) {
@Override
public final DorisType decimalType(DataType type) {
// doris or starRocks precision 不能超过超过半27
int precision = type.getColumnSize();
if (precision < 1 || precision > DataTypeMeta.DEFAULT_DECIMAL_PRECISION) {
precision = DataTypeMeta.DEFAULT_DECIMAL_PRECISION;
}

return new DorisType(type,
getDecimalToken() + "(" + Math.min(type.getColumnSize(), 27) + "," + (type.getDecimalDigits() != null ?
getDecimalToken() + "(" + precision + "," + (type.getDecimalDigits() != null ?
type.getDecimalDigits() : 0) + ")");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataDumpers;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.DataTypeMeta;
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
import com.qlangtech.tis.plugin.ds.TISTable;
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
Expand Down Expand Up @@ -243,7 +244,7 @@ private String convertType(CMeta col) {
if (type.getColumnSize() > 0) {
return "DECIMAL(" + type.getColumnSize() + "," + type.getDecimalDigits() + ")";
} else {
return "DECIMAL";
return "DECIMAL(" + DataTypeMeta.DEFAULT_DECIMAL_PRECISION + ",0)";
}
}
case DATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.stream.Collectors;

/**
*
* WRITER extends BasicDataXRdbmsWriter, DS extends BasicDataSourceFactory
*
* @author: 百岁(baisui@qlangtech.com)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.qlangtech.plugins.incr.flink.chunjun.doris.sink;

import com.alibaba.citrus.turbine.Context;
import com.alibaba.fastjson.JSONObject;
import com.dtstack.chunjun.conf.OperatorConf;
import com.dtstack.chunjun.conf.SyncConf;
Expand All @@ -37,6 +38,9 @@
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.SelectedTab;
import com.qlangtech.tis.plugin.datax.SelectedTabExtend;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
Expand All @@ -48,6 +52,7 @@
import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory;
import com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends;
import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -60,12 +65,29 @@
import java.util.Map;
import java.util.Properties;

//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
//import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT;

/**
* https://dtstack.github.io/chunjun/documents/zh/ChunJun%E8%BF%9E%E6%8E%A5%E5%99%A8@doris@dorisbatch-sink?lang=zh
*
* @author: 百岁(baisui@qlangtech.com)
* @create: 2022-08-15 14:32
**/
public class ChunjunDorisSinkFactory extends ChunjunSinkFactory {


@FormField(ordinal = 13, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer})
public Integer connectTimeout;// = loadConf.getRequestConnectTimeoutMs() == null ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : loadConf.getRequestConnectTimeoutMs();
@FormField(ordinal = 14, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer})
public Integer socketTimeout;// = loadConf.getRequestReadTimeoutMs() == null ? DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : loadConf.getRequestReadTimeoutMs();
@FormField(ordinal = 15, advance = true, type = FormFieldType.INT_NUMBER, validate = {Validator.require, Validator.integer})
public Integer retries;// = loadConf.getRequestRetries() == null ? DORIS_REQUEST_RETRIES_DEFAULT : loadConf.getRequestRetries();

//


@Override
protected Class<? extends JdbcDialect> getJdbcDialectClass() {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -109,6 +131,14 @@ protected void setParameter(BasicDataSourceFactory dsFactory

params.put(DorisKeys.DATABASE_KEY, dsFactory.dbName);
params.put(DorisKeys.TABLE_KEY, targetTabName);
// params.put()

// int connectTimeout = loadConf.getRequestConnectTimeoutMs() == null ? DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : loadConf.getRequestConnectTimeoutMs();
// int socketTimeout = loadConf.getRequestReadTimeoutMs() == null ? DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : loadConf.getRequestReadTimeoutMs();
// int retries = loadConf.getRequestRetries() == null ? DORIS_REQUEST_RETRIES_DEFAULT : loadConf.getRequestRetries();
params.put(DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY, this.connectTimeout);
params.put(DorisKeys.REQUEST_READ_TIMEOUT_MS_KEY, this.socketTimeout);
params.put(DorisKeys.REQUEST_RETRIES_KEY, this.retries);

DorisSelectedTab dorisTab = (DorisSelectedTab) tab;
if (dorisTab.seqKey.isOn()) {
Expand Down Expand Up @@ -217,6 +247,32 @@ protected IEndTypeGetter.EndType getTargetType() {
return IEndTypeGetter.EndType.Doris;
}

public boolean validateConnectTimeout(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {

if (Integer.parseInt(value) < 1000) {
msgHandler.addFieldError(context, fieldName, "不能小于1秒");
return false;
}

return true;
}

public boolean validateSocketTimeout(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
if (Integer.parseInt(value) < 1000) {
msgHandler.addFieldError(context, fieldName, "不能小于1秒");
return false;
}
return true;
}

public boolean validateRetries(IFieldErrorHandler msgHandler, Context context, String fieldName, String value) {
if (Integer.parseInt(value) < 1) {
msgHandler.addFieldError(context, fieldName, "不能小于1次");
return false;
}
return true;
}

@Override
public Descriptor<SelectedTabExtend> getSelectedTableExtendDescriptor() {
// return TIS.get().getDescriptor(DorisTabProps.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink;

import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT;

/**
* @author: 百岁(baisui@qlangtech.com)
* @create: 2024-02-26 08:52
**/
public class ChunjunDorisCommon {
public static Integer dftConnectTimeout() {
return DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
}
public static Integer dftSocketTimeout() {
return DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
}
public static Integer dftRetries() {
return DORIS_REQUEST_RETRIES_DEFAULT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"connectTimeout": {
"dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftConnectTimeout()",
"help": "和服务端建立连接的超时时间,单位:ms"
},
"socketTimeout": {
"dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftSocketTimeout()",
"help": "从服务端读取数据的超时时间,单位:ms"
},
"retries": {
"dftVal": "com.qlangtech.tis.plugins.incr.flink.chunjun.doris.sink.ChunjunDorisCommon.dftRetries()",
"help": "从服务端读取数据失败最大重试次数"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

import com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TestChunjunDorisSinkFactory;
import com.qlangtech.plugins.incr.flink.chunjun.doris.sink.TestChunjunDorisSinkFactoryWithoutDocker;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

Expand All @@ -25,6 +26,6 @@
* @create: 2022-09-02 10:31
**/
@RunWith(Suite.class)
@Suite.SuiteClasses({TestChunjunDorisSinkFactory.class})
@Suite.SuiteClasses({TestChunjunDorisSinkFactory.class, TestChunjunDorisSinkFactoryWithoutDocker.class})
public class TestAll {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.qlangtech.tis.coredefine.module.action.TargetResName;
import com.qlangtech.tis.datax.IStreamTableMeataCreator;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugin.common.PluginDesc;
import com.qlangtech.tis.plugin.datax.SelectedTab;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.doris.DataXDorisWriter;
Expand Down Expand Up @@ -202,6 +203,7 @@ protected BasicDataSourceFactory getDsFactory() {
// }



/**
* https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual
* https://github.com/apache/doris/blob/1b0b5b5f0940f37811fc9bdce8d148766e46f6cb/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD.md
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.plugins.incr.flink.chunjun.doris.sink;

import com.qlangtech.tis.plugin.common.PluginDesc;
import org.junit.Test;

/**
* @author: 百岁(baisui@qlangtech.com)
* @create: 2024-02-26 08:50
**/
public class TestChunjunDorisSinkFactoryWithoutDocker {
@Test
public void testDescJsonGenerate(){
PluginDesc.testDescGenerate(ChunjunDorisSinkFactory.class, "chunjun-doris-sink-descriptor.json");
}
}
Loading

0 comments on commit 1807a07

Please sign in to comment.