From 0052d8d503be6254e1a5c87ed85ce35882b9d0b9 Mon Sep 17 00:00:00 2001 From: FlechazoW <35768015+FlechazoW@users.noreply.github.com> Date: Thu, 27 Oct 2022 11:53:14 +0800 Subject: [PATCH] [Improve][Connector][Doris] Improve doris source. (#1359) --- .../doris/converter/DorisColumnConverter.java | 6 - .../converter/DorisHttpRowConverter.java | 6 - .../converter/DorisRowTypeConverter.java | 18 ++- .../DorisConnectFailedException.java | 4 - .../connector/doris/options/DorisConf.java | 18 --- .../doris/options/DorisConfBuilder.java | 14 +-- .../connector/doris/options/DorisKeys.java | 4 - .../connector/doris/options/DorisOptions.java | 6 - .../connector/doris/options/LoadConf.java | 4 - .../doris/options/LoadConfBuilder.java | 4 - .../chunjun/connector/doris/rest/Carrier.java | 6 - .../connector/doris/rest/DorisLoadClient.java | 6 - .../connector/doris/rest/DorisStreamLoad.java | 8 +- .../connector/doris/rest/FeRestService.java | 4 - .../connector/doris/rest/module/Backend.java | 4 - .../doris/rest/module/BackendRow.java | 4 - .../connector/doris/rest/module/Field.java | 4 - .../rest/module/PartitionDefinition.java | 4 - .../doris/rest/module/QueryPlan.java | 4 - .../doris/rest/module/RespContent.java | 4 - .../connector/doris/rest/module/Schema.java | 4 - .../connector/doris/rest/module/Tablet.java | 4 - .../doris/sink/DorisDynamicTableSink.java | 6 - .../doris/sink/DorisHttpOutputFormat.java | 22 +--- .../sink/DorisHttpOutputFormatBuilder.java | 6 - .../doris/sink/DorisSinkFactory.java | 113 +++++++++++------- .../doris/source/DorisSourceFactory.java | 39 ++++++ .../doris/table/DorisDynamicTableFactory.java | 11 +- .../chunjun/connector/jdbc/conf/JdbcConf.java | 68 +++++------ .../chunjun/connector/jdbc/util/JdbcUtil.java | 3 + 30 files changed, 166 insertions(+), 242 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/source/DorisSourceFactory.java diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisColumnConverter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisColumnConverter.java index c757ed4bfa..137eeeadf3 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisColumnConverter.java @@ -29,12 +29,6 @@ import java.util.List; import java.util.StringJoiner; -/** - * Company:www.dtstack.com. - * - * @author shitou - * @date 2021/11/10 - */ public class DorisColumnConverter extends AbstractRowConverter { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpRowConverter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpRowConverter.java index 3348f24e50..2d8934c391 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpRowConverter.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisHttpRowConverter.java @@ -29,12 +29,6 @@ import java.util.StringJoiner; -/** - * Company: www.dtstack.com - * - * @author xuchao - * @date 2021-11-21 - */ public class DorisHttpRowConverter extends AbstractRowConverter { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowTypeConverter.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowTypeConverter.java index e80d07c44d..bcb30ecd5a 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowTypeConverter.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/converter/DorisRowTypeConverter.java @@ -25,10 +25,6 @@ import java.util.Locale; -/** - * @author xuchao - * @date 2021-11-21 - */ public class DorisRowTypeConverter { public static DataType apply(String type) { @@ -38,31 +34,41 @@ public static DataType apply(String type) { return DataTypes.BOOLEAN(); case "TINYINT": return DataTypes.TINYINT(); + case "TINYINT UNSIGNED": case "SMALLINT": + return DataTypes.SMALLINT(); + case "SMALLINT UNSIGNED": case "MEDIUMINT": + case "MEDIUMINT UNSIGNED": case "INT": case "INTEGER": case "INT24": return DataTypes.INT(); + case "INT UNSIGNED": case "BIGINT": return DataTypes.BIGINT(); + case "BIGINT UNSIGNED": + return DataTypes.DECIMAL(20, 0); case "REAL": case "FLOAT": + case "FLOAT UNSIGNED": return DataTypes.FLOAT(); case "DECIMAL": + case "DECIMAL UNSIGNED": case "NUMERIC": case "DECIMALV2": return DataTypes.DECIMAL(38, 18); case "DOUBLE": + case "DOUBLE UNSIGNED": return DataTypes.DOUBLE(); case "CHAR": case "VARCHAR": case "STRING": - case "JSON": case "TINYTEXT": case "TEXT": case "MEDIUMTEXT": case "LONGTEXT": + case "JSON": case "ENUM": case "SET": return DataTypes.STRING(); @@ -81,8 +87,8 @@ public static DataType apply(String type) { case "LONGBLOB": case "BINARY": case "VARBINARY": - // BYTES 底层调用的是VARBINARY最大长度 case "GEOMETRY": + // BYTES 底层调用的是VARBINARY最大长度 return DataTypes.BYTES(); case "NULL_TYPE": case "NULL": diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/exception/DorisConnectFailedException.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/exception/DorisConnectFailedException.java index 76714b0301..38ba717f58 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/exception/DorisConnectFailedException.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/exception/DorisConnectFailedException.java @@ -18,10 +18,6 @@ package com.dtstack.chunjun.connector.doris.exception; -/** - * @author tiezhu@dtstack - * @date 2021/9/17 星期五 - */ public class DorisConnectFailedException extends RuntimeException { public DorisConnectFailedException(String username, String hostUrl, Throwable cause) { super(String.format("User [%s] connect to [%s] failed.", username, hostUrl), cause); diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java index 1c4951e09d..694af33085 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java @@ -30,22 +30,12 @@ import java.util.Objects; import java.util.Properties; -/** - * @author tiezhu@dtstack - * @date 2021/9/16 星期四 - */ public class DorisConf extends JdbcConf { private String database; private String table; - private String username; - - private String password; - - private String writeMode; - private List feNodes; private String url; @@ -102,14 +92,6 @@ public void setPassword(String password) { this.password = password; } - public String getWriteMode() { - return writeMode; - } - - public void setWriteMode(String writeMode) { - this.writeMode = writeMode; - } - public List getFeNodes() { return feNodes; } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java index f707522971..e9d3516ab6 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java @@ -22,11 +22,8 @@ import java.util.Properties; import java.util.StringJoiner; -/** - * @author tiezhu@dtstack - * @date 2021/9/17 星期五 - */ public class DorisConfBuilder { + private final DorisConf dorisConf; public DorisConfBuilder() { @@ -58,11 +55,6 @@ public DorisConfBuilder setPassword(String password) { return this; } - public DorisConfBuilder setWriteMode(String writeMode) { - this.dorisConf.setWriteMode(writeMode); - return this; - } - public DorisConfBuilder setNameMapped(boolean needNameMapping) { this.dorisConf.setNameMapped(needNameMapping); return this; @@ -101,10 +93,6 @@ public DorisConfBuilder setWaitRetryMills(long waitRetryMills) { public DorisConf build() { StringJoiner errorMessage = new StringJoiner("\n"); - if (dorisConf.getFeNodes() == null || dorisConf.getFeNodes().isEmpty()) { - errorMessage.add("Doris FeNodes can not be empty!"); - } - if (dorisConf.getUsername() == null || dorisConf.getUsername().isEmpty()) { errorMessage.add("Doris Username can not be empty!"); } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java index 9a1742d4d6..6813879710 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisKeys.java @@ -18,10 +18,6 @@ package com.dtstack.chunjun.connector.doris.options; -/** - * @author tiezhu@dtstack - * @date 2021/9/17 星期五 - */ public final class DorisKeys { public static final String FIELD_DELIMITER_KEY = "fieldDelimiter"; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java index f79851d9c5..8641a294c7 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisOptions.java @@ -23,12 +23,6 @@ import java.util.List; -/** - * Company: www.dtstack.com - * - * @author xuchao - * @date 2021-11-21 - */ public class DorisOptions { public static final ConfigOption> FENODES = ConfigOptions.key("feNodes") diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConf.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConf.java index 63ba59bd7f..6c7d34f02a 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConf.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConf.java @@ -20,10 +20,6 @@ import java.io.Serializable; -/** - * @author tiezhu@dtstack - * @date 2021/9/18 星期六 - */ public class LoadConf implements Serializable { private static final Long serialVersionUID = 1L; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfBuilder.java index 7b414b10a0..789fb75fb9 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/LoadConfBuilder.java @@ -18,10 +18,6 @@ package com.dtstack.chunjun.connector.doris.options; -/** - * @author tiezhu@dtstack - * @date 2021/9/18 星期六 - */ public class LoadConfBuilder { private final LoadConf loadConf; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java index 95959d6168..8b70f19d93 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java @@ -30,12 +30,6 @@ import java.util.StringJoiner; import java.util.stream.IntStream; -/** - * Company:www.dtstack.com. - * - * @author shitou - * @date 2022/1/17 - */ public class Carrier implements Serializable { private static final long serialVersionUID = 1L; private final List> insertContent; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java index e2843bb754..5d6a74d299 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java @@ -50,12 +50,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -/** - * Company:www.dtstack.com. - * - * @author shitou - * @date 2021/12/21 - */ public class DorisLoadClient implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisLoadClient.class); diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java index 60514db37b..373aaa2cab 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java @@ -53,10 +53,6 @@ import java.util.UUID; import java.util.stream.Collectors; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -64,8 +60,8 @@ public class DorisStreamLoad implements Serializable { private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?"; - private String authEncoding; - private Properties streamLoadProp; + private final String authEncoding; + private final Properties streamLoadProp; private String hostPort; private DorisConf options; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/FeRestService.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/FeRestService.java index e1226bc634..b803d7bb28 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/FeRestService.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/FeRestService.java @@ -71,10 +71,6 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.ILLEGAL_ARGUMENT_MESSAGE; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.SHOULD_NOT_HAPPEN_MESSAGE; -/** - * @author tiezhu@dtstack - * @date 2021/9/17 星期五 - */ public class FeRestService implements Serializable { public static final int REST_RESPONSE_STATUS_OK = 200; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Backend.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Backend.java index cb889da2c0..b3ed24599f 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Backend.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Backend.java @@ -23,10 +23,6 @@ import java.util.List; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ @JsonIgnoreProperties(ignoreUnknown = true) public class Backend { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/BackendRow.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/BackendRow.java index a306d23529..d831e85139 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/BackendRow.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/BackendRow.java @@ -22,10 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.builder.ToStringBuilder; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ @JsonIgnoreProperties(ignoreUnknown = true) public class BackendRow { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Field.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Field.java index 16275f04ce..c7a11383ac 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Field.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Field.java @@ -22,10 +22,6 @@ import java.util.Objects; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ public class Field { private String name; private String type; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/PartitionDefinition.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/PartitionDefinition.java index 05471d0000..daf4296748 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/PartitionDefinition.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/PartitionDefinition.java @@ -27,10 +27,6 @@ import java.util.Set; import java.util.StringJoiner; -/** - * @author tiezhu@dtstack - * @date 2021/9/18 星期六 - */ public class PartitionDefinition implements Serializable, Comparable { private final String database; private final String table; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/QueryPlan.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/QueryPlan.java index 85ef287f76..6670a0a450 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/QueryPlan.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/QueryPlan.java @@ -21,10 +21,6 @@ import java.util.Map; import java.util.Objects; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ public class QueryPlan { private int status; private String opaqued_query_plan; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/RespContent.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/RespContent.java index 00a7212ff1..9ac00a4484 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/RespContent.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/RespContent.java @@ -23,10 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ @JsonIgnoreProperties(ignoreUnknown = true) public class RespContent { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Schema.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Schema.java index 8609b98a1d..ef0b084536 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Schema.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Schema.java @@ -24,10 +24,6 @@ import java.util.List; import java.util.Objects; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ public class Schema { private int status = 0; private List properties; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Tablet.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Tablet.java index 8f0d739ca0..a225deece8 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Tablet.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/module/Tablet.java @@ -21,10 +21,6 @@ import java.util.List; import java.util.Objects; -/** - * @author tiezhu@dtstack.com - * @since 08/10/2021 Friday - */ public class Tablet { private List routing; private int version; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java index a74b796cdd..7d0a400402 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisDynamicTableSink.java @@ -41,12 +41,6 @@ import java.util.Arrays; import java.util.List; -/** - * Company: www.dtstack.com - * - * @author xuchao - * @date 2021-11-21 - */ public class DorisDynamicTableSink extends JdbcDynamicTableSink { private final TableSchema physicalSchema; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java index 3580f6be7d..4065731f3b 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormat.java @@ -22,7 +22,6 @@ import com.dtstack.chunjun.connector.doris.rest.Carrier; import com.dtstack.chunjun.connector.doris.rest.DorisLoadClient; import com.dtstack.chunjun.connector.doris.rest.DorisStreamLoad; -import com.dtstack.chunjun.connector.doris.rest.FeRestService; import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.WriteRecordException; @@ -35,12 +34,7 @@ import java.util.Map; import java.util.Set; -/** - * use DorisStreamLoad to write data into doris - * - * @author tiezhu@dtstack - * @date 2021/9/16 星期四 - */ +/** use DorisStreamLoad to write data into doris */ public class DorisHttpOutputFormat extends BaseRichOutputFormat { private DorisConf options; private DorisLoadClient client; @@ -57,16 +51,6 @@ public void setColumns(List columns) { this.columns = columns; } - private String getBackend() throws IOException { - try { - // get be url from fe - return FeRestService.randomBackend(options); - } catch (IOException e) { - LOG.error("get backends info fail"); - throw new IOException(e); - } - } - @Override public void open(int taskNumber, int numTasks) throws IOException { DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(options); @@ -76,12 +60,12 @@ public void open(int taskNumber, int numTasks) throws IOException { } @Override - protected void openInternal(int taskNumber, int numTasks) throws IOException { + protected void openInternal(int taskNumber, int numTasks) { LOG.info("task number : {} , number task : {}", taskNumber, numTasks); } @Override - protected void closeInternal() throws IOException {} + protected void closeInternal() {} @Override protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java index 5e230005de..b86494b233 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisHttpOutputFormatBuilder.java @@ -24,12 +24,6 @@ import java.util.List; -/** - * Company:www.dtstack.com. - * - * @author shitou - * @date 2021/11/8 - */ public class DorisHttpOutputFormatBuilder extends BaseRichOutputFormatBuilder { diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java index 4ae72ba7a0..8000aaaea1 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java @@ -22,21 +22,34 @@ import com.dtstack.chunjun.conf.SyncConf; import com.dtstack.chunjun.connector.doris.converter.DorisRowTypeConverter; import com.dtstack.chunjun.connector.doris.options.DorisConf; -import com.dtstack.chunjun.connector.doris.options.DorisConfBuilder; import com.dtstack.chunjun.connector.doris.options.LoadConf; import com.dtstack.chunjun.connector.doris.options.LoadConfBuilder; +import com.dtstack.chunjun.connector.jdbc.adapter.ConnectionAdapter; +import com.dtstack.chunjun.connector.jdbc.conf.ConnectionConf; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.exclusion.FieldNameExclusionStrategy; +import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil; +import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; +import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.RawTypeConverter; import com.dtstack.chunjun.sink.SinkFactory; +import com.dtstack.chunjun.util.GsonUtil; +import com.dtstack.chunjun.util.TableUtil; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang3.tuple.Pair; + +import java.sql.Connection; import java.util.List; import java.util.Properties; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.BATCH_SIZE_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DATABASE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DESERIALIZE_ARROW_ASYNC_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DESERIALIZE_QUEUE_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_BATCH_SIZE_DEFAULT; @@ -47,33 +60,16 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_REQUEST_RETRIES_DEFAULT; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_WRITE_MODE_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.EXEC_MEM_LIMIT_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FE_NODES_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.MAX_RETRIES_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_BATCH_SIZE_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_CONNECT_TIMEOUT_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_QUERY_TIMEOUT_S_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_READ_TIMEOUT_MS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_RETRIES_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.REQUEST_TABLET_SIZE_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.TABLE_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.USER_NAME_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WAITRETRIES_MS_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.WRITE_MODE_KEY; -/** - * Company:www.dtstack.com. - * - * @author shitou - * @date 2021/11/8 - */ public class DorisSinkFactory extends SinkFactory { - private final DorisConf options; public DorisSinkFactory(SyncConf syncConf) { @@ -81,7 +77,16 @@ public DorisSinkFactory(SyncConf syncConf) { final OperatorConf parameter = syncConf.getWriter(); - DorisConfBuilder dorisConfBuilder = new DorisConfBuilder(); + Gson gson = + new GsonBuilder() + .registerTypeAdapter( + ConnectionConf.class, new ConnectionAdapter("SinkConnectionConf")) + .addDeserializationExclusionStrategy( + new FieldNameExclusionStrategy("column")) + .create(); + GsonUtil.setTypeAdapter(gson); + options = gson.fromJson(gson.toJson(syncConf.getWriter().getParameter()), DorisConf.class); + LoadConfBuilder loadConfBuilder = new LoadConfBuilder(); Properties properties = parameter.getProperties(LOAD_OPTIONS_KEY, new Properties()); @@ -130,37 +135,59 @@ public DorisSinkFactory(SyncConf syncConf) { DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)) .build(); - options = - dorisConfBuilder - .setDatabase(parameter.getStringVal(DATABASE_KEY)) - .setTable(parameter.getStringVal(TABLE_KEY)) - .setFeNodes((List) parameter.getVal(FE_NODES_KEY)) - .setLoadOptions(loadConf) - .setLoadProperties( - parameter.getProperties(LOAD_PROPERTIES_KEY, new Properties())) - .setPassword(parameter.getStringVal(PASSWORD_KEY, "")) - .setNameMapped( - syncConf.getNameMappingConf() != null - && !syncConf.getNameMappingConf().isEmpty()) - .setWriteMode( - parameter.getStringVal(WRITE_MODE_KEY, DORIS_WRITE_MODE_DEFAULT)) - .setUsername(parameter.getStringVal(USER_NAME_KEY)) - .setBatchSize(parameter.getIntVal(BATCH_SIZE_KEY, 1000)) - .setFlushIntervalMills(parameter.getLongVal(FLUSH_INTERNAL_MS_KEY, 10000L)) - .setMaxRetries(parameter.getIntVal(MAX_RETRIES_KEY, 1)) - .setWaitRetryMills(parameter.getLongVal(WAITRETRIES_MS_KEY, 18000L)) - .build(); options.setColumn(syncConf.getWriter().getFieldList()); + options.setLoadProperties(properties); + options.setLoadConf(loadConf); super.initCommonConf(options); } @Override public DataStreamSink createSink(DataStream dataSet) { - DorisHttpOutputFormatBuilder builder = new DorisHttpOutputFormatBuilder(); - builder.setDorisOptions(options); + if (options.getFeNodes() != null) { + DorisHttpOutputFormatBuilder builder = new DorisHttpOutputFormatBuilder(); + builder.setDorisOptions(options); + return createOutput(dataSet, builder.finish()); + } + + DorisJdbcOutputFormatBuilder builder = + new DorisJdbcOutputFormatBuilder(new DorisJdbcOutputFormat()); + + MysqlDialect dialect = new MysqlDialect(); + initColumnInfo(options, dialect, builder); + builder.setJdbcConf(options); + builder.setDdlConf(ddlConf); + + builder.setJdbcDialect(dialect); + + AbstractRowConverter rowConverter; + final RowType rowType = TableUtil.createRowType(options.getColumn(), getRawTypeConverter()); + // 同步任务使用transform + if (!useAbstractBaseColumn) { + rowConverter = dialect.getRowConverter(rowType); + } else { + rowConverter = dialect.getColumnConverter(rowType, options); + } + builder.setRowConverter(rowConverter, useAbstractBaseColumn); return createOutput(dataSet, builder.finish()); } + protected void initColumnInfo( + DorisConf conf, JdbcDialect dialect, DorisJdbcOutputFormatBuilder builder) { + Connection conn = JdbcUtil.getConnection(conf, dialect); + + // get table metadata + Tuple3 tableIdentify = dialect.getTableIdentify().apply(conf); + Pair, List> tableMetaData = + JdbcUtil.getTableMetaData( + tableIdentify.f0, tableIdentify.f1, tableIdentify.f2, conn); + + Pair, List> selectedColumnInfo = + JdbcUtil.buildColumnWithMeta(conf, tableMetaData, null); + builder.setColumnNameList(selectedColumnInfo.getLeft()); + builder.setColumnTypeList(selectedColumnInfo.getRight()); + JdbcUtil.closeDbResources(null, null, conn, false); + } + @Override public RawTypeConverter getRawTypeConverter() { return DorisRowTypeConverter::apply; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/source/DorisSourceFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/source/DorisSourceFactory.java new file mode 100644 index 0000000000..3d40bf60cd --- /dev/null +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/source/DorisSourceFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.doris.source; + +import com.dtstack.chunjun.conf.SyncConf; +import com.dtstack.chunjun.connector.doris.converter.DorisRowTypeConverter; +import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; +import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect; +import com.dtstack.chunjun.converter.RawTypeConverter; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class DorisSourceFactory extends JdbcSourceFactory { + + public DorisSourceFactory(SyncConf syncConf, StreamExecutionEnvironment env) { + super(syncConf, env, new MysqlDialect()); + } + + @Override + public RawTypeConverter getRawTypeConverter() { + return DorisRowTypeConverter::apply; + } +} diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java index 5bfd2117aa..a442959ce7 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java @@ -56,14 +56,7 @@ import static com.dtstack.chunjun.connector.jdbc.options.JdbcLookupOptions.VERTX_PREFIX; import static com.dtstack.chunjun.connector.jdbc.options.JdbcLookupOptions.getLibConfMap; -/** - * declare doris table factory info. - * - *

Company: www.dtstack.com - * - * @author xuchao - * @date 2021-11-21 - */ +/** declare doris table factory info. */ public class DorisDynamicTableFactory extends JdbcDynamicTableFactory implements DynamicTableSinkFactory { @@ -185,7 +178,7 @@ private static DorisConf getConfByOptions(ReadableConfig config) { dorisConf.setLoadConf(loadConf); dorisConf.setLoadProperties(new Properties()); dorisConf.setMaxRetries(config.get(DorisOptions.MAX_RETRIES)); - dorisConf.setWriteMode(config.get(DorisOptions.WRITE_MODE)); + dorisConf.setMode(config.get(DorisOptions.WRITE_MODE)); dorisConf.setBatchSize(config.get(DorisOptions.BATCH_SIZE)); return dorisConf; diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java index 09e3081c54..dfe13a856d 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/conf/JdbcConf.java @@ -43,68 +43,68 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable { protected String nullDelim; /** for sqlserver */ - private boolean withNoLock; + protected boolean withNoLock; // common - private Properties properties; + protected Properties properties; // reader - private String username; - private String password; - private List connection; - private String where; - private String customSql; - private String orderByColumn; - private String querySql; - private String splitPk; - private String splitStrategy; - private int fetchSize = 0; - private int queryTimeOut = 0; + protected String username; + protected String password; + protected List connection; + protected String where; + protected String customSql; + protected String orderByColumn; + protected String querySql; + protected String splitPk; + protected String splitStrategy; + protected int fetchSize = 0; + protected int queryTimeOut = 0; // 连接超时时间 - private int connectTimeOut = 0; + protected int connectTimeOut = 0; /** 是否为增量任务 */ - private boolean increment = false; + protected boolean increment = false; /** 是否为增量轮询 */ - private boolean polling = false; + protected boolean polling = false; /** * Whether to take the maximum value of incrementColumn in db as startLocation in polling mode */ - private boolean pollingFromMax = false; + protected boolean pollingFromMax = false; /** 字段名称 */ - private String increColumn; + protected String increColumn; /** Whether an OrderBy sort is required,increment mode need set to true. */ - private boolean isOrderBy = true; + protected boolean isOrderBy = true; /** 字段索引 */ - private int increColumnIndex = -1; + protected int increColumnIndex = -1; /** 字段类型 */ - private String increColumnType; + protected String increColumnType; /** 字段初始值 */ - private String startLocation; + protected String startLocation; /** 轮询时间间隔 */ - private long pollingInterval = 5000; + protected long pollingInterval = 5000; /** restore字段名称 */ - private String restoreColumn; + protected String restoreColumn; /** restore字段类型 */ - private String restoreColumnType; + protected String restoreColumnType; /** restore字段索引 */ - private int restoreColumnIndex = -1; + protected int restoreColumnIndex = -1; /** 用于标记是否保存endLocation位置的一条或多条数据 true:不保存 false(默认):保存 某些情况下可能出现最后几条数据被重复记录的情况,可以将此参数配置为true */ - private boolean useMaxFunc = false; + protected boolean useMaxFunc = false; // writer /** 增量同步或者间隔轮询时,是否初始化外部存储 */ - private Boolean initReporter = true; + protected Boolean initReporter = true; @SerializedName(value = "mode", alternate = "writeMode") - private String mode = "INSERT"; + protected String mode = "INSERT"; - private List preSql; - private List postSql; - private List uniqueKey; - @Deprecated private Map> updateKey; + protected List preSql; + protected List postSql; + protected List uniqueKey; + @Deprecated protected Map> updateKey; /** upsert 写数据库时,是否null覆盖原来的值 */ - private boolean allReplace = false; + protected boolean allReplace = false; public Boolean getInitReporter() { return initReporter; diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java index 96794d98c2..caa48b8972 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/util/JdbcUtil.java @@ -140,6 +140,9 @@ public static Pair, List> getTableMetaData( public static Pair, List> getTableMetaData( String cataLog, String schema, String tableName, Connection dbConn, String querySql) { try { + if (StringUtils.isEmpty(schema)) { + schema = cataLog; + } if (StringUtils.isBlank(querySql)) { // check table exists if (ALL_TABLE.equalsIgnoreCase(tableName.trim())) {