diff --git a/docs/en/connector-v2/source/Doris.md b/docs/en/connector-v2/source/Doris.md index c67444b58c8..373b84f8fdd 100644 --- a/docs/en/connector-v2/source/Doris.md +++ b/docs/en/connector-v2/source/Doris.md @@ -13,15 +13,14 @@ - [x] [batch](../../concept/connector-v2-features.md) - [ ] [stream](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) -- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) - [x] [parallelism](../../concept/connector-v2-features.md) - [x] [support user-defined split](../../concept/connector-v2-features.md) +- [x] [support multiple table read](../../concept/connector-v2-features.md) ## Description -Used to read data from Doris. -Doris Source will send a SQL to FE, FE will parse it into an execution plan, send it to BE, and BE will -directly return the data +Used to read data from Apache Doris. ## Supported DataSource Info @@ -29,11 +28,6 @@ directly return the data |------------|--------------------------------------|--------|-----|-------| | Doris | Only Doris2.0 or later is supported. | - | - | - | -## Database Dependency - -> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' -> working directory
- ## Data Type Mapping | Doris Data type | SeaTunnel Data type | @@ -54,29 +48,40 @@ directly return the data ## Source Options +Base configuration: + | Name | Type | Required | Default | Description | |----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------| | fenodes | string | yes | - | FE address, the format is `"fe_host:fe_http_port"` | | username | string | yes | - | User username | | password | string | yes | - | User password | +| doris.request.retries | int | no | 3 | Number of retries to send requests to Doris FE. | +| doris.request.read.timeout.ms | int | no | 30000 | | +| doris.request.connect.timeout.ms | int | no | 30000 | | +| query-port | string | no | 9030 | Doris QueryPort | +| doris.request.query.timeout.s | int | no | 3600 | Timeout period of Doris scan data, expressed in seconds. | +| table_list | string | 否 | - | table list | + +Table list configuration: + +| Name | Type | Required | Default | Description | +|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------| | database | string | yes | - | The name of Doris database | | table | string | yes | - | The name of Doris table | | doris.read.field | string | no | - | Use the 'doris.read.field' parameter to select the doris table columns to read | -| query-port | string | no | 9030 | Doris QueryPort | | doris.filter.query | string | no | - | Data filtering in doris. the format is "field = value",example : doris.filter.query = "F_ID > 2" | | doris.batch.size | int | no | 1024 | The maximum value that can be obtained by reading Doris BE once. | -| doris.request.query.timeout.s | int | no | 3600 | Timeout period of Doris scan data, expressed in seconds. | | doris.exec.mem.limit | long | no | 2147483648 | Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648). | -| doris.request.retries | int | no | 3 | Number of retries to send requests to Doris FE. | -| doris.request.read.timeout.ms | int | no | 30000 | | -| doris.request.connect.timeout.ms | int | no | 30000 | | + +Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer. ### Tips > It is not recommended to modify advanced parameters at will -## Task Example +## Example +### single table > This is an example of reading a Doris table and writing to Console. ``` @@ -159,4 +164,49 @@ sink { Console {} } ``` +### Multiple table +``` +env{ + parallelism = 1 + job.mode = "BATCH" +} +source{ + Doris { + fenodes = "xxxx:8030" + username = root + password = "" + table_list = [ + { + database = "st_source_0" + table = "doris_table_0" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT" + doris.filter.query = "F_ID >= 50" + }, + { + database = "st_source_1" + table = "doris_table_1" + } + ] + } +} + +transform {} + +sink{ + Doris { + fenodes = "xxxx:8030" + schema_save_mode = "RECREATE_SCHEMA" + username = root + password = "" + database = "st_sink" + table = "${table_name}" + sink.enable-2pc = "true" + sink.label-prefix = "test_json" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` diff --git a/docs/zh/connector-v2/source/Doris.md b/docs/zh/connector-v2/source/Doris.md new file mode 100644 index 00000000000..ba3549473a5 --- /dev/null +++ b/docs/zh/connector-v2/source/Doris.md @@ -0,0 +1,212 @@ +# Doris + +> Doris 源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要功能 + +- [x] [批处理](../../concept/connector-v2-features.md) +- [ ] [流处理](../../concept/connector-v2-features.md) +- [ ] [精确一次](../../concept/connector-v2-features.md) +- [x] [列投影](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [x] [支持用户自定义分片](../../concept/connector-v2-features.md) +- [x] [支持多表读](../../concept/connector-v2-features.md) + +## 描述 + +用于 Apache Doris 的源连接器。 + +## 支持的数据源信息 + +| 数据源 | 支持版本 | 驱动 | Url | Maven | +|------------|--------------------------------------|--------|-----|-------| +| Doris | 仅支持Doris2.0及以上版本. | - | - | - | + +## 数据类型映射 + +| Doris 数据类型 | SeaTunnel 数据类型 | +|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| INT | INT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| BIGINT | BIGINT | +| LARGEINT | STRING | +| BOOLEAN | BOOLEAN | +| DECIMAL | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| CHAR
VARCHAR
STRING
TEXT | STRING | +| DATE | DATE | +| DATETIME
DATETIME(p) | TIMESTAMP | +| ARRAY | ARRAY | + +## 源选项 + +基础配置: + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------| +| fenodes | string | yes | - | FE 地址, 格式:`"fe_host:fe_http_port"` | +| username | string | yes | - | 用户名 | +| password | string | yes | - | 密码 | +| doris.request.retries | int | no | 3 | 请求Doris FE的重试次数 | +| doris.request.read.timeout.ms | int | no | 30000 | | +| doris.request.connect.timeout.ms | int | no | 30000 | | +| query-port | string | no | 9030 | Doris查询端口 | +| doris.request.query.timeout.s | int | no | 3600 | Doris扫描数据的超时时间,单位秒 | +| table_list | string | 否 | - | 表清单 | + +表清单配置: + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------| +| database | string | yes | - | 数据库 | +| table | string | yes | - | 表名 | +| doris.read.field | string | no | - | 选择要读取的Doris表字段 | +| doris.filter.query | string | no | - | 数据过滤. 格式:"字段 = 值", 例如:doris.filter.query = "F_ID > 2" | +| doris.batch.size | int | no | 1024 | 每次能够从BE中读取到的最大行数 | +| doris.exec.mem.limit | long | no | 2147483648 | 单个be扫描请求可以使用的最大内存。默认内存为2G(2147483648) | + +注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。 + +### 提示 + +> 不建议随意修改高级参数 + +## 例子 + +### 单表 +> 这是一个从doris读取数据后,输出到控制台的例子: + +``` +env { + parallelism = 2 + job.mode = "BATCH" +} +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_table" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +使用`doris.read.field`参数来选择需要读取的Doris表字段: + +``` +env { + parallelism = 2 + job.mode = "BATCH" +} +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_table" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +使用`doris.filter.query`来过滤数据,参数值将作为过滤条件直接传递到doris: + +``` +env { + parallelism = 2 + job.mode = "BATCH" +} +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_table" + doris.filter.query = "F_ID > 2" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` +### 多表 +``` +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "xxxx:8030" + username = root + password = "" + table_list = [ + { + database = "st_source_0" + table = "doris_table_0" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT" + doris.filter.query = "F_ID >= 50" + }, + { + database = "st_source_1" + table = "doris_table_1" + } + ] + } +} + +transform {} + +sink{ + Doris { + fenodes = "xxxx:8030" + schema_save_mode = "RECREATE_SCHEMA" + username = root + password = "" + database = "st_sink" + table = "${table_name}" + sink.enable-2pc = "true" + sink.label-prefix = "test_json" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} +``` diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/backend/BackendClient.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/backend/BackendClient.java index 31bdb2a78e7..04f96d2d607 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/backend/BackendClient.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/backend/BackendClient.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransport; import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransportException; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.source.serialization.Routing; @@ -55,7 +55,7 @@ public class BackendClient { private final int socketTimeout; private final int connectTimeout; - public BackendClient(Routing routing, DorisConfig readOptions) { + public BackendClient(Routing routing, DorisSourceConfig readOptions) { this.routing = routing; this.connectTimeout = readOptions.getRequestConnectTimeoutMs(); this.socketTimeout = readOptions.getRequestReadTimeoutMs(); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java index a7f5eabf63d..324200e5e4d 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java @@ -37,7 +37,6 @@ import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.config.DorisOptions; import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory; import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2; @@ -85,7 +84,7 @@ public class DorisCatalog implements Catalog { private Connection conn; - private DorisConfig dorisConfig; + private String createTableTemplate; private String dorisVersion; @@ -110,9 +109,9 @@ public DorisCatalog( Integer queryPort, String username, String password, - DorisConfig config) { + String createTableTemplate) { this(catalogName, frontEndNodes, queryPort, username, password); - this.dorisConfig = config; + this.createTableTemplate = createTableTemplate; } public DorisCatalog( @@ -121,9 +120,9 @@ public DorisCatalog( Integer queryPort, String username, String password, - DorisConfig config, + String createTableTemplate, String defaultDatabase) { - this(catalogName, frontEndNodes, queryPort, username, password, config); + this(catalogName, frontEndNodes, queryPort, username, password, createTableTemplate); this.defaultDatabase = defaultDatabase; } @@ -414,7 +413,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI String stmt = DorisCatalogUtil.getCreateTableStatement( - dorisConfig.getCreateTableTemplate(), tablePath, table, typeConverter); + createTableTemplate, tablePath, table, typeConverter); try (Statement statement = conn.createStatement()) { statement.execute(stmt); } catch (SQLException e) { @@ -510,7 +509,7 @@ public PreviewResult previewAction( checkArgument(catalogTable.isPresent(), "CatalogTable cannot be null"); return new SQLPreviewResult( DorisCatalogUtil.getCreateTableStatement( - dorisConfig.getCreateTableTemplate(), + createTableTemplate, tablePath, catalogTable.get(), // used for test when typeConverter is null diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java index 1071b52f05a..7fd1da603e2 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java @@ -22,11 +22,14 @@ import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import com.google.auto.service.AutoService; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE; + @AutoService(Factory.class) public class DorisCatalogFactory implements CatalogFactory { @@ -38,13 +41,13 @@ public Catalog createCatalog(String catalogName, ReadonlyConfig options) { options.get(DorisOptions.QUERY_PORT), options.get(DorisOptions.USERNAME), options.get(DorisOptions.PASSWORD), - DorisConfig.of(options), - options.get(DorisOptions.DEFAULT_DATABASE)); + options.get(SAVE_MODE_CREATE_TEMPLATE), + options.get(DorisSinkOptions.DEFAULT_DATABASE)); } @Override public String factoryIdentifier() { - return DorisConfig.IDENTIFIER; + return IDENTIFIER; } @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java deleted file mode 100644 index f7155e8a647..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.config; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.configuration.ReadonlyConfig; - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -import java.io.Serializable; -import java.util.Map; -import java.util.Properties; - -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_DESERIALIZE_ARROW_ASYNC; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_DESERIALIZE_QUEUE_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_EXEC_MEM_LIMIT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_FILTER_QUERY; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_READ_FIELD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_REQUEST_QUERY_TIMEOUT_S; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_REQUEST_READ_TIMEOUT_MS; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_REQUEST_RETRIES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_SINK_CONFIG_PREFIX; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_TABLET_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_COUNT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_CHECK_INTERVAL; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENABLE_2PC; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENABLE_DELETE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_LABEL_PREFIX; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_MAX_RETRIES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; - -@Setter -@Getter -@ToString -public class DorisConfig implements Serializable { - - public static final String IDENTIFIER = "Doris"; - - // common option - private String frontends; - private String database; - private String table; - private String username; - private String password; - private Integer queryPort; - private int batchSize; - - // source option - private String readField; - private String filterQuery; - private Integer tabletSize; - private Integer requestConnectTimeoutMs; - private Integer requestReadTimeoutMs; - private Integer requestQueryTimeoutS; - private Integer requestRetries; - private Boolean deserializeArrowAsync; - private int deserializeQueueSize; - private Long execMemLimit; - private boolean useOldApi; - - // sink option - private Boolean enable2PC; - private Boolean enableDelete; - private String labelPrefix; - private Integer checkInterval; - private Integer maxRetries; - private Integer bufferSize; - private Integer bufferCount; - private Properties streamLoadProps; - private boolean needsUnsupportedTypeCasting; - - // create table option - private String createTableTemplate; - - public static DorisConfig of(Config pluginConfig) { - return of(ReadonlyConfig.fromConfig(pluginConfig)); - } - - public static DorisConfig of(ReadonlyConfig config) { - - DorisConfig dorisConfig = new DorisConfig(); - - // common option - dorisConfig.setFrontends(config.get(FENODES)); - dorisConfig.setUsername(config.get(USERNAME)); - dorisConfig.setPassword(config.get(PASSWORD)); - dorisConfig.setQueryPort(config.get(QUERY_PORT)); - dorisConfig.setStreamLoadProps(parseStreamLoadProperties(config)); - dorisConfig.setDatabase(config.get(DATABASE)); - dorisConfig.setTable(config.get(TABLE)); - - // source option - dorisConfig.setReadField(config.get(DORIS_READ_FIELD)); - dorisConfig.setFilterQuery(config.get(DORIS_FILTER_QUERY)); - dorisConfig.setTabletSize(config.get(DORIS_TABLET_SIZE)); - dorisConfig.setRequestConnectTimeoutMs(config.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)); - dorisConfig.setRequestQueryTimeoutS(config.get(DORIS_REQUEST_QUERY_TIMEOUT_S)); - dorisConfig.setRequestReadTimeoutMs(config.get(DORIS_REQUEST_READ_TIMEOUT_MS)); - dorisConfig.setRequestRetries(config.get(DORIS_REQUEST_RETRIES)); - dorisConfig.setDeserializeArrowAsync(config.get(DORIS_DESERIALIZE_ARROW_ASYNC)); - dorisConfig.setDeserializeQueueSize(config.get(DORIS_DESERIALIZE_QUEUE_SIZE)); - dorisConfig.setBatchSize(config.get(DORIS_BATCH_SIZE)); - dorisConfig.setExecMemLimit(config.get(DORIS_EXEC_MEM_LIMIT)); - - // sink option - dorisConfig.setEnable2PC(config.get(SINK_ENABLE_2PC)); - dorisConfig.setLabelPrefix(config.get(SINK_LABEL_PREFIX)); - dorisConfig.setCheckInterval(config.get(SINK_CHECK_INTERVAL)); - dorisConfig.setMaxRetries(config.get(SINK_MAX_RETRIES)); - dorisConfig.setBufferSize(config.get(SINK_BUFFER_SIZE)); - dorisConfig.setBufferCount(config.get(SINK_BUFFER_COUNT)); - dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE)); - dorisConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)); - - // create table option - dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE)); - - return dorisConfig; - } - - private static Properties parseStreamLoadProperties(ReadonlyConfig config) { - Properties streamLoadProps = new Properties(); - if (config.getOptional(DORIS_SINK_CONFIG_PREFIX).isPresent()) { - Map map = config.getOptional(DORIS_SINK_CONFIG_PREFIX).get(); - map.forEach( - (key, value) -> { - streamLoadProps.put(key.toLowerCase(), value); - }); - } - return streamLoadProps; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java index ddf1195b6ed..bcdf24c9d7b 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java @@ -20,32 +20,12 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.DataSaveMode; -import org.apache.seatunnel.api.sink.SaveModePlaceHolder; -import org.apache.seatunnel.api.sink.SchemaSaveMode; - -import java.util.Map; - -import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA; public interface DorisOptions { - int DORIS_TABLET_SIZE_MIN = 1; - int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; - int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; - int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; - int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; - int DORIS_REQUEST_RETRIES_DEFAULT = 3; - Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; - int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; - int DORIS_BATCH_SIZE_DEFAULT = 1024; - long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; - int DEFAULT_SINK_CHECK_INTERVAL = 10000; - int DEFAULT_SINK_MAX_RETRIES = 3; - int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024; - int DEFAULT_SINK_BUFFER_COUNT = 3; - + String IDENTIFIER = "Doris"; String DORIS_DEFAULT_CLUSTER = "default_cluster"; + int DORIS_BATCH_SIZE_DEFAULT = 1024; // common option Option FENODES = @@ -72,6 +52,7 @@ public interface DorisOptions { .stringType() .noDefaultValue() .withDescription("the doris user name."); + Option PASSWORD = Options.key("password") .stringType() @@ -79,202 +60,17 @@ public interface DorisOptions { .withDescription("the doris password."); Option TABLE = - Options.key("table") - .stringType() - .noDefaultValue() - .withDescription("the doris table name."); + Options.key("table").stringType().noDefaultValue().withDescription("table"); + Option DATABASE = - Options.key("database") - .stringType() - .noDefaultValue() - .withDescription("the doris database name."); + Options.key("database").stringType().noDefaultValue().withDescription("database"); + Option DORIS_BATCH_SIZE = Options.key("doris.batch.size") .intType() .defaultValue(DORIS_BATCH_SIZE_DEFAULT) .withDescription("the batch size of the doris read/write."); - // source config options - Option DORIS_READ_FIELD = - Options.key("doris.read.field") - .stringType() - .noDefaultValue() - .withDescription( - "List of column names in the Doris table, separated by commas"); - Option DORIS_FILTER_QUERY = - Options.key("doris.filter.query") - .stringType() - .noDefaultValue() - .withDescription( - "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); - Option DORIS_TABLET_SIZE = - Options.key("doris.request.tablet.size") - .intType() - .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); - Option DORIS_REQUEST_CONNECT_TIMEOUT_MS = - Options.key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); - Option DORIS_REQUEST_READ_TIMEOUT_MS = - Options.key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); - Option DORIS_REQUEST_QUERY_TIMEOUT_S = - Options.key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); - Option DORIS_REQUEST_RETRIES = - Options.key("doris.request.retries") - .intType() - .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); - Option DORIS_DESERIALIZE_ARROW_ASYNC = - Options.key("doris.deserialize.arrow.async") - .booleanType() - .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); - Option DORIS_DESERIALIZE_QUEUE_SIZE = - Options.key("doris.request.retriesdoris.deserialize.queue.size") - .intType() - .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); - - Option DORIS_EXEC_MEM_LIMIT = - Options.key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); - - // sink config options - Option SINK_ENABLE_2PC = - Options.key("sink.enable-2pc") - .booleanType() - .defaultValue(false) - .withDescription("enable 2PC while loading"); - - Option SINK_CHECK_INTERVAL = - Options.key("sink.check-interval") - .intType() - .defaultValue(DEFAULT_SINK_CHECK_INTERVAL) - .withDescription("check exception with the interval while loading"); - Option SINK_MAX_RETRIES = - Options.key("sink.max-retries") - .intType() - .defaultValue(DEFAULT_SINK_MAX_RETRIES) - .withDescription("the max retry times if writing records to database failed."); - Option SINK_BUFFER_SIZE = - Options.key("sink.buffer-size") - .intType() - .defaultValue(DEFAULT_SINK_BUFFER_SIZE) - .withDescription("the buffer size to cache data for stream load."); - Option SINK_BUFFER_COUNT = - Options.key("sink.buffer-count") - .intType() - .defaultValue(DEFAULT_SINK_BUFFER_COUNT) - .withDescription("the buffer count to cache data for stream load."); - Option SINK_LABEL_PREFIX = - Options.key("sink.label-prefix") - .stringType() - .defaultValue("") - .withDescription("the unique label prefix."); - Option SINK_ENABLE_DELETE = - Options.key("sink.enable-delete") - .booleanType() - .defaultValue(false) - .withDescription("whether to enable the delete function"); - - Option> DORIS_SINK_CONFIG_PREFIX = - Options.key("doris.config") - .mapType() - .noDefaultValue() - .withDescription( - "The parameter of the Stream Load data_desc. " - + "The way to specify the parameter is to add the prefix `doris.config` to the original load parameter name "); - - Option DEFAULT_DATABASE = - Options.key("default-database") - .stringType() - .defaultValue("information_schema") - .withDescription(""); - - Option SCHEMA_SAVE_MODE = - Options.key("schema_save_mode") - .enumType(SchemaSaveMode.class) - .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) - .withDescription("schema_save_mode"); - - Option DATA_SAVE_MODE = - Options.key("data_save_mode") - .enumType(DataSaveMode.class) - .defaultValue(DataSaveMode.APPEND_DATA) - .withDescription("data_save_mode"); - - Option CUSTOM_SQL = - Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql"); - - Option NEEDS_UNSUPPORTED_TYPE_CASTING = - Options.key("needs_unsupported_type_casting") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether to enable the unsupported type casting, such as Decimal64 to Double"); - - // create table - Option SAVE_MODE_CREATE_TEMPLATE = - Options.key("save_mode_create_template") - .stringType() - .defaultValue( - "CREATE TABLE IF NOT EXISTS `" - + SaveModePlaceHolder.DATABASE.getPlaceHolder() - + "`.`" - + SaveModePlaceHolder.TABLE.getPlaceHolder() - + "` (\n" - + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() - + ",\n" - + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder() - + "\n" - + ") ENGINE=OLAP\n" - + " UNIQUE KEY (" - + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() - + ")\n" - + "DISTRIBUTED BY HASH (" - + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() - + ")\n " - + "PROPERTIES (\n" - + "\"replication_allocation\" = \"tag.location.default: 1\",\n" - + "\"in_memory\" = \"false\",\n" - + "\"storage_format\" = \"V2\",\n" - + "\"disable_auto_compaction\" = \"false\"\n" - + ")") - .withDescription("Create table statement template, used to create Doris table"); - - OptionRule.Builder SINK_RULE = - OptionRule.builder() - .required( - FENODES, - USERNAME, - PASSWORD, - SINK_LABEL_PREFIX, - DORIS_SINK_CONFIG_PREFIX, - DATA_SAVE_MODE, - SCHEMA_SAVE_MODE) - .optional( - DATABASE, - TABLE, - TABLE_IDENTIFIER, - QUERY_PORT, - DORIS_BATCH_SIZE, - SINK_ENABLE_2PC, - SINK_ENABLE_DELETE, - MULTI_TABLE_SINK_REPLICA, - SAVE_MODE_CREATE_TEMPLATE, - NEEDS_UNSUPPORTED_TYPE_CASTING) - .conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL); - OptionRule.Builder CATALOG_RULE = OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java new file mode 100644 index 00000000000..8f0d948042f --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.io.Serializable; +import java.util.Map; +import java.util.Properties; + +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_BUFFER_COUNT; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_BUFFER_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_CHECK_INTERVAL; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_ENABLE_2PC; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_ENABLE_DELETE; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_LABEL_PREFIX; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SINK_MAX_RETRIES; + +@Setter +@Getter +@ToString +public class DorisSinkConfig implements Serializable { + + // common option + private String frontends; + private String database; + private String table; + private String username; + private String password; + private Integer queryPort; + private int batchSize; + + // sink option + private Boolean enable2PC; + private Boolean enableDelete; + private String labelPrefix; + private Integer checkInterval; + private Integer maxRetries; + private Integer bufferSize; + private Integer bufferCount; + private Properties streamLoadProps; + private boolean needsUnsupportedTypeCasting; + + // create table option + private String createTableTemplate; + + public static DorisSinkConfig of(Config pluginConfig) { + return of(ReadonlyConfig.fromConfig(pluginConfig)); + } + + public static DorisSinkConfig of(ReadonlyConfig config) { + + DorisSinkConfig dorisSinkConfig = new DorisSinkConfig(); + + // common option + dorisSinkConfig.setFrontends(config.get(FENODES)); + dorisSinkConfig.setUsername(config.get(USERNAME)); + dorisSinkConfig.setPassword(config.get(PASSWORD)); + dorisSinkConfig.setQueryPort(config.get(QUERY_PORT)); + dorisSinkConfig.setStreamLoadProps(parseStreamLoadProperties(config)); + dorisSinkConfig.setDatabase(config.get(DATABASE)); + dorisSinkConfig.setTable(config.get(TABLE)); + dorisSinkConfig.setBatchSize(config.get(DORIS_BATCH_SIZE)); + + // sink option + dorisSinkConfig.setEnable2PC(config.get(SINK_ENABLE_2PC)); + dorisSinkConfig.setLabelPrefix(config.get(SINK_LABEL_PREFIX)); + dorisSinkConfig.setCheckInterval(config.get(SINK_CHECK_INTERVAL)); + dorisSinkConfig.setMaxRetries(config.get(SINK_MAX_RETRIES)); + dorisSinkConfig.setBufferSize(config.get(SINK_BUFFER_SIZE)); + dorisSinkConfig.setBufferCount(config.get(SINK_BUFFER_COUNT)); + dorisSinkConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE)); + dorisSinkConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)); + + // create table option + dorisSinkConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE)); + + return dorisSinkConfig; + } + + private static Properties parseStreamLoadProperties(ReadonlyConfig config) { + Properties streamLoadProps = new Properties(); + if (config.getOptional(DORIS_SINK_CONFIG_PREFIX).isPresent()) { + Map map = config.getOptional(DORIS_SINK_CONFIG_PREFIX).get(); + map.forEach( + (key, value) -> { + streamLoadProps.put(key.toLowerCase(), value); + }); + } + return streamLoadProps; + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java new file mode 100644 index 00000000000..372418d12a4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SaveModePlaceHolder; +import org.apache.seatunnel.api.sink.SchemaSaveMode; + +import java.util.Map; + +import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; + +public interface DorisSinkOptions { + + int DEFAULT_SINK_CHECK_INTERVAL = 10000; + int DEFAULT_SINK_MAX_RETRIES = 3; + int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024; + int DEFAULT_SINK_BUFFER_COUNT = 3; + + Option SINK_ENABLE_2PC = + Options.key("sink.enable-2pc") + .booleanType() + .defaultValue(false) + .withDescription("enable 2PC while loading"); + + Option SINK_CHECK_INTERVAL = + Options.key("sink.check-interval") + .intType() + .defaultValue(DEFAULT_SINK_CHECK_INTERVAL) + .withDescription("check exception with the interval while loading"); + Option SINK_MAX_RETRIES = + Options.key("sink.max-retries") + .intType() + .defaultValue(DEFAULT_SINK_MAX_RETRIES) + .withDescription("the max retry times if writing records to database failed."); + Option SINK_BUFFER_SIZE = + Options.key("sink.buffer-size") + .intType() + .defaultValue(DEFAULT_SINK_BUFFER_SIZE) + .withDescription("the buffer size to cache data for stream load."); + Option SINK_BUFFER_COUNT = + Options.key("sink.buffer-count") + .intType() + .defaultValue(DEFAULT_SINK_BUFFER_COUNT) + .withDescription("the buffer count to cache data for stream load."); + Option SINK_LABEL_PREFIX = + Options.key("sink.label-prefix") + .stringType() + .defaultValue("") + .withDescription("the unique label prefix."); + Option SINK_ENABLE_DELETE = + Options.key("sink.enable-delete") + .booleanType() + .defaultValue(false) + .withDescription("whether to enable the delete function"); + + Option> DORIS_SINK_CONFIG_PREFIX = + Options.key("doris.config") + .mapType() + .noDefaultValue() + .withDescription( + "The parameter of the Stream Load data_desc. " + + "The way to specify the parameter is to add the prefix `doris.config` to the original load parameter name "); + + Option DEFAULT_DATABASE = + Options.key("default-database") + .stringType() + .defaultValue("information_schema") + .withDescription(""); + + Option SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + Option DATA_SAVE_MODE = + Options.key("data_save_mode") + .enumType(DataSaveMode.class) + .defaultValue(DataSaveMode.APPEND_DATA) + .withDescription("data_save_mode"); + + Option CUSTOM_SQL = + Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql"); + + Option NEEDS_UNSUPPORTED_TYPE_CASTING = + Options.key("needs_unsupported_type_casting") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable the unsupported type casting, such as Decimal64 to Double"); + + // create table + Option SAVE_MODE_CREATE_TEMPLATE = + Options.key("save_mode_create_template") + .stringType() + .defaultValue( + "CREATE TABLE IF NOT EXISTS `" + + SaveModePlaceHolder.DATABASE.getPlaceHolder() + + "`.`" + + SaveModePlaceHolder.TABLE.getPlaceHolder() + + "` (\n" + + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + + ",\n" + + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder() + + "\n" + + ") ENGINE=OLAP\n" + + " UNIQUE KEY (" + + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + + ")\n" + + "DISTRIBUTED BY HASH (" + + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + + ")\n " + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ")") + .withDescription("Create table statement template, used to create Doris table"); + + OptionRule.Builder SINK_RULE = + OptionRule.builder() + .required( + FENODES, + USERNAME, + PASSWORD, + SINK_LABEL_PREFIX, + DORIS_SINK_CONFIG_PREFIX, + DATA_SAVE_MODE, + SCHEMA_SAVE_MODE) + .optional( + DATABASE, + TABLE, + TABLE_IDENTIFIER, + QUERY_PORT, + DORIS_BATCH_SIZE, + SINK_ENABLE_2PC, + SINK_ENABLE_DELETE, + MULTI_TABLE_SINK_REPLICA, + SAVE_MODE_CREATE_TEMPLATE, + NEEDS_UNSUPPORTED_TYPE_CASTING) + .conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL); +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java new file mode 100644 index 00000000000..999f8fbfeaa --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Data; +import lombok.experimental.SuperBuilder; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_QUERY_TIMEOUT_S; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_READ_TIMEOUT_MS; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_RETRIES; + +@Data +@SuperBuilder +public class DorisSourceConfig implements Serializable { + + private String frontends; + private Integer queryPort; + private String username; + private String password; + private Integer requestConnectTimeoutMs; + private Integer requestReadTimeoutMs; + private Integer requestQueryTimeoutS; + private Integer requestRetries; + private Boolean deserializeArrowAsync; + private int deserializeQueueSize; + private boolean useOldApi; + private List tableConfigList; + + public static DorisSourceConfig of(ReadonlyConfig config) { + DorisSourceConfigBuilder builder = DorisSourceConfig.builder(); + builder.tableConfigList(DorisTableConfig.of(config)); + builder.frontends(config.get(FENODES)); + builder.queryPort(config.get(QUERY_PORT)); + builder.username(config.get(USERNAME)); + builder.password(config.get(PASSWORD)); + builder.requestConnectTimeoutMs(config.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)); + builder.requestReadTimeoutMs(config.get(DORIS_REQUEST_READ_TIMEOUT_MS)); + builder.requestQueryTimeoutS(config.get(DORIS_REQUEST_QUERY_TIMEOUT_S)); + builder.requestRetries(config.get(DORIS_REQUEST_RETRIES)); + builder.deserializeArrowAsync(config.get(DORIS_DESERIALIZE_ARROW_ASYNC)); + builder.deserializeQueueSize(config.get(DORIS_DESERIALIZE_QUEUE_SIZE)); + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java new file mode 100644 index 00000000000..2ee852ffccc --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.List; + +public interface DorisSourceOptions { + + int DORIS_TABLET_SIZE_MIN = 1; + int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; + int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; + int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; + int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; + int DORIS_REQUEST_RETRIES_DEFAULT = 3; + Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; + int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; + + Option> TABLE_LIST = + Options.key("table_list") + .listType(DorisTableConfig.class) + .noDefaultValue() + .withDescription("table list config."); + + Option DORIS_READ_FIELD = + Options.key("doris.read.field") + .stringType() + .noDefaultValue() + .withDescription( + "List of column names in the Doris table, separated by commas"); + Option DORIS_FILTER_QUERY = + Options.key("doris.filter.query") + .stringType() + .noDefaultValue() + .withDescription( + "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); + + Option DORIS_TABLET_SIZE = + Options.key("doris.request.tablet.size") + .intType() + .defaultValue(DORIS_TABLET_SIZE_DEFAULT) + .withDescription(""); + + Option DORIS_REQUEST_CONNECT_TIMEOUT_MS = + Options.key("doris.request.connect.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .withDescription(""); + + Option DORIS_REQUEST_READ_TIMEOUT_MS = + Options.key("doris.request.read.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .withDescription(""); + + Option DORIS_REQUEST_QUERY_TIMEOUT_S = + Options.key("doris.request.query.timeout.s") + .intType() + .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .withDescription(""); + + Option DORIS_REQUEST_RETRIES = + Options.key("doris.request.retries") + .intType() + .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) + .withDescription(""); + + Option DORIS_DESERIALIZE_ARROW_ASYNC = + Options.key("doris.deserialize.arrow.async") + .booleanType() + .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .withDescription(""); + + Option DORIS_DESERIALIZE_QUEUE_SIZE = + Options.key("doris.request.retriesdoris.deserialize.queue.size") + .intType() + .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .withDescription(""); + + Option DORIS_EXEC_MEM_LIMIT = + Options.key("doris.exec.mem.limit") + .longType() + .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) + .withDescription(""); +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java new file mode 100644 index 00000000000..624d25636b2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.config; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.commons.lang3.StringUtils; + +import lombok.Builder; +import lombok.Data; +import lombok.experimental.Tolerate; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_TABLET_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.TABLE_LIST; + +@Data +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class DorisTableConfig implements Serializable { + + @JsonProperty("table") + private String table; + + @JsonProperty("database") + private String database; + + @JsonProperty("doris.read.field") + private String readField; + + @JsonProperty("doris.filter.query") + private String filterQuery; + + @JsonProperty("doris.batch.size") + private int batchSize; + + @JsonProperty("doris.request.tablet.size") + private int tabletSize; + + @JsonProperty("doris.exec.mem.limit") + private long execMemLimit; + + @Tolerate + public DorisTableConfig() {} + + public static List of(ReadonlyConfig connectorConfig) { + List tableList; + if (connectorConfig.getOptional(TABLE_LIST).isPresent()) { + tableList = connectorConfig.get(TABLE_LIST); + } else { + DorisTableConfig tableProperty = + DorisTableConfig.builder() + .table(connectorConfig.get(TABLE)) + .database(connectorConfig.get(DATABASE)) + .readField(connectorConfig.get(DORIS_READ_FIELD)) + .filterQuery(connectorConfig.get(DORIS_FILTER_QUERY)) + .batchSize(connectorConfig.get(DORIS_BATCH_SIZE)) + .tabletSize(connectorConfig.get(DORIS_TABLET_SIZE)) + .execMemLimit(connectorConfig.get(DORIS_EXEC_MEM_LIMIT)) + .build(); + tableList = Collections.singletonList(tableProperty); + } + + if (tableList.size() > 1) { + List tableIds = + tableList.stream() + .map(DorisTableConfig::getTableIdentifier) + .collect(Collectors.toList()); + Set tableIdSet = new HashSet<>(tableIds); + if (tableIdSet.size() < tableList.size() - 1) { + throw new IllegalArgumentException( + "Please configure unique `database`.`table`, not allow null/duplicate: " + + tableIds); + } + } + + for (DorisTableConfig dorisTableConfig : tableList) { + if (StringUtils.isBlank(dorisTableConfig.getDatabase())) { + throw new IllegalArgumentException( + "Please configure `database`, not allow null database in config."); + } + if (StringUtils.isBlank(dorisTableConfig.getTable())) { + throw new IllegalArgumentException( + "Please configure `table`, not allow null table in config."); + } + if (dorisTableConfig.getBatchSize() <= 0) { + dorisTableConfig.setBatchSize(DORIS_BATCH_SIZE.defaultValue()); + } + if (dorisTableConfig.getExecMemLimit() <= 0) { + dorisTableConfig.setExecMemLimit(DORIS_EXEC_MEM_LIMIT.defaultValue()); + } + if (dorisTableConfig.getTabletSize() <= 0) { + dorisTableConfig.setTabletSize(DORIS_TABLET_SIZE.defaultValue()); + } + } + return tableList; + } + + public String getTableIdentifier() { + return String.format("%s.%s", database, table); + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java index e6b9b95361d..67266b453f5 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java @@ -26,12 +26,13 @@ import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.exception.CommonError; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import lombok.extern.slf4j.Slf4j; import java.util.Locale; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; + @Slf4j public abstract class AbstractDorisTypeConverter implements TypeConverter { public static final String DORIS_NULL = "NULL"; @@ -186,7 +187,7 @@ public void sampleTypeConverter( break; default: throw CommonError.convertToSeaTunnelTypeError( - DorisConfig.IDENTIFIER, dorisColumnType, typeDefine.getName()); + IDENTIFIER, dorisColumnType, typeDefine.getName()); } } @@ -234,7 +235,7 @@ protected void sampleReconvertString( } throw CommonError.convertToConnectorTypeError( - DorisConfig.IDENTIFIER, column.getDataType().getSqlType().name(), column.getName()); + IDENTIFIER, column.getDataType().getSqlType().name(), column.getName()); } protected BasicTypeDefine sampleReconvert( @@ -366,9 +367,7 @@ protected BasicTypeDefine sampleReconvert( break; default: throw CommonError.convertToConnectorTypeError( - DorisConfig.IDENTIFIER, - column.getDataType().getSqlType().name(), - column.getName()); + IDENTIFIER, column.getDataType().getSqlType().name(), column.getName()); } return builder.build(); } @@ -430,7 +429,7 @@ private void reconvertBuildArrayInternal( break; default: throw CommonError.convertToConnectorTypeError( - DorisConfig.IDENTIFIER, elementType.getSqlType().name(), columnName); + IDENTIFIER, elementType.getSqlType().name(), columnName); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java index fb129249702..9b7e98368fb 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java @@ -23,11 +23,12 @@ import org.apache.seatunnel.api.table.converter.TypeConverter; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; + /** Doris type converter for version 1.2.x */ @Slf4j @AutoService(TypeConverter.class) @@ -42,7 +43,7 @@ public class DorisTypeConverterV1 extends AbstractDorisTypeConverter { @Override public String identifier() { - return DorisConfig.IDENTIFIER; + return IDENTIFIER; } @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java index 3b5ebde0f47..46ae79251e0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java @@ -28,7 +28,6 @@ import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.exception.CommonError; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @@ -37,6 +36,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; + /** Doris type converter for version 2.x */ @Slf4j @AutoService(TypeConverter.class) @@ -62,7 +63,7 @@ public class DorisTypeConverterV2 extends AbstractDorisTypeConverter { @Override public String identifier() { - return DorisConfig.IDENTIFIER; + return IDENTIFIER; } @Override @@ -166,7 +167,7 @@ private void convertArray( DecimalArrayType decimalArray = new DecimalArrayType(new DecimalType(20, 0)); builder.dataType(decimalArray); } else { - throw CommonError.convertToSeaTunnelTypeError(DorisConfig.IDENTIFIER, columnType, name); + throw CommonError.convertToSeaTunnelTypeError(IDENTIFIER, columnType, name); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index b516157443a..97fd3ca78e9 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -18,12 +18,13 @@ package org.apache.seatunnel.connectors.doris.rest; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan; import org.apache.seatunnel.connectors.doris.rest.models.Tablet; +import org.apache.seatunnel.connectors.doris.source.DorisSourceTable; import org.apache.seatunnel.connectors.doris.util.ErrorMessages; import org.apache.commons.io.IOUtils; @@ -69,11 +70,12 @@ public class RestService implements Serializable { private static final String QUERY_PLAN = "_query_plan"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static String send(DorisConfig dorisConfig, HttpRequestBase request, Logger logger) + private static String send( + DorisSourceConfig dorisSourceConfig, HttpRequestBase request, Logger logger) throws DorisConnectorException { - int connectTimeout = dorisConfig.getRequestConnectTimeoutMs(); - int socketTimeout = dorisConfig.getRequestReadTimeoutMs(); - int retries = dorisConfig.getRequestRetries(); + int connectTimeout = dorisSourceConfig.getRequestConnectTimeoutMs(); + int socketTimeout = dorisSourceConfig.getRequestReadTimeoutMs(); + int retries = dorisSourceConfig.getRequestRetries(); logger.trace( "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", connectTimeout, @@ -90,7 +92,7 @@ private static String send(DorisConfig dorisConfig, HttpRequestBase request, Log logger.info( "Send request to Doris FE '{}' with user '{}'.", request.getURI(), - dorisConfig.getUsername()); + dorisSourceConfig.getUsername()); IOException ex = null; int statusCode = -1; @@ -102,15 +104,15 @@ private static String send(DorisConfig dorisConfig, HttpRequestBase request, Log response = getConnectionGet( request.getURI().toString(), - dorisConfig.getUsername(), - dorisConfig.getPassword(), + dorisSourceConfig.getUsername(), + dorisSourceConfig.getPassword(), logger); } else { response = getConnectionPost( request, - dorisConfig.getUsername(), - dorisConfig.getPassword(), + dorisSourceConfig.getUsername(), + dorisSourceConfig.getPassword(), logger); } if (StringUtils.isEmpty(response)) { @@ -251,11 +253,16 @@ public static String randomEndpoint(String feNodes, Logger logger) } @VisibleForTesting - static String getUriStr(DorisConfig dorisConfig, Logger logger) throws DorisConnectorException { - String tableIdentifier = dorisConfig.getDatabase() + "." + dorisConfig.getTable(); + static String getUriStr( + DorisSourceConfig dorisSourceConfig, DorisSourceTable dorisSourceTable, Logger logger) + throws DorisConnectorException { + String tableIdentifier = + dorisSourceTable.getTablePath().getDatabaseName() + + "." + + dorisSourceTable.getTablePath().getTableName(); String[] identifier = parseIdentifier(tableIdentifier, logger); return "http://" - + randomEndpoint(dorisConfig.getFrontends(), logger) + + randomEndpoint(dorisSourceConfig.getFrontends(), logger) + API_PREFIX + "/" + identifier[0] @@ -265,9 +272,13 @@ static String getUriStr(DorisConfig dorisConfig, Logger logger) throws DorisConn } public static List findPartitions( - SeaTunnelRowType rowType, DorisConfig dorisConfig, Logger logger) + DorisSourceConfig dorisSourceConfig, DorisSourceTable dorisSourceTable, Logger logger) throws DorisConnectorException { - String tableIdentifier = dorisConfig.getDatabase() + "." + dorisConfig.getTable(); + String tableIdentifier = + dorisSourceTable.getTablePath().getDatabaseName() + + "." + + dorisSourceTable.getTablePath().getTableName(); + SeaTunnelRowType rowType = dorisSourceTable.getCatalogTable().getSeaTunnelRowType(); String[] tableIdentifiers = parseIdentifier(tableIdentifier, logger); String readFields = "*"; if (rowType.getFieldNames().length != 0) { @@ -281,12 +292,13 @@ public static List findPartitions( + "`.`" + tableIdentifiers[1] + "`"; - if (!StringUtils.isEmpty(dorisConfig.getFilterQuery())) { - sql += " where " + dorisConfig.getFilterQuery(); + if (!StringUtils.isEmpty(dorisSourceTable.getFilterQuery())) { + sql += " where " + dorisSourceTable.getFilterQuery(); } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); - HttpPost httpPost = new HttpPost(getUriStr(dorisConfig, logger) + QUERY_PLAN); + HttpPost httpPost = + new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable, logger) + QUERY_PLAN); String entity = "{\"sql\": \"" + sql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); @@ -294,12 +306,12 @@ public static List findPartitions( stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); - String resStr = send(dorisConfig, httpPost, logger); + String resStr = send(dorisSourceConfig, httpPost, logger); logger.debug("Find partition response is '{}'.", resStr); QueryPlan queryPlan = getQueryPlan(resStr, logger); Map> be2Tablets = selectBeForTablet(queryPlan, logger); return tabletsMapToPartition( - dorisConfig, + dorisSourceTable, be2Tablets, queryPlan.getOpaqued_query_plan(), tableIdentifiers[0], @@ -397,18 +409,18 @@ static Map> selectBeForTablet(QueryPlan queryPlan, Logger log } @VisibleForTesting - static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger logger) { - int tabletsSize = DorisOptions.DORIS_TABLET_SIZE_DEFAULT; - if (dorisConfig.getTabletSize() != null) { - tabletsSize = dorisConfig.getTabletSize(); + static int tabletCountLimitForOnePartition(DorisSourceTable dorisSourceTable, Logger logger) { + int tabletsSize = DorisSourceOptions.DORIS_TABLET_SIZE_DEFAULT; + if (dorisSourceTable.getTabletSize() != null) { + tabletsSize = dorisSourceTable.getTabletSize(); } - if (tabletsSize < DorisOptions.DORIS_TABLET_SIZE_MIN) { + if (tabletsSize < DorisSourceOptions.DORIS_TABLET_SIZE_MIN) { logger.warn( "{} is less than {}, set to default value {}.", - DorisOptions.DORIS_TABLET_SIZE, - DorisOptions.DORIS_TABLET_SIZE_MIN, - DorisOptions.DORIS_TABLET_SIZE_MIN); - tabletsSize = DorisOptions.DORIS_TABLET_SIZE_MIN; + DorisSourceOptions.DORIS_TABLET_SIZE, + DorisSourceOptions.DORIS_TABLET_SIZE_MIN, + DorisSourceOptions.DORIS_TABLET_SIZE_MIN); + tabletsSize = DorisSourceOptions.DORIS_TABLET_SIZE_MIN; } logger.debug("Tablet size is set to {}.", tabletsSize); return tabletsSize; @@ -416,14 +428,14 @@ static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger logge @VisibleForTesting static List tabletsMapToPartition( - DorisConfig dorisConfig, + DorisSourceTable dorisSourceTable, Map> be2Tablets, String opaquedQueryPlan, String database, String table, Logger logger) throws DorisConnectorException { - int tabletsSize = tabletCountLimitForOnePartition(dorisConfig, logger); + int tabletsSize = tabletCountLimitForOnePartition(dorisSourceTable, logger); List partitions = new ArrayList<>(); for (Map.Entry> beInfo : be2Tablets.entrySet()) { logger.debug("Generate partition with beInfo: '{}'.", beInfo); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index c0a9a2a5a17..deb88a51b11 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -33,8 +33,8 @@ import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer; @@ -55,7 +55,7 @@ public class DorisSink SupportSaveMode, SupportMultiTableSink { - private final DorisConfig dorisConfig; + private final DorisSinkConfig dorisSinkConfig; private final ReadonlyConfig config; private final CatalogTable catalogTable; private String jobId; @@ -63,7 +63,7 @@ public class DorisSink public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) { this.config = config; this.catalogTable = catalogTable; - this.dorisConfig = DorisConfig.of(config); + this.dorisSinkConfig = DorisSinkConfig.of(config); } @Override @@ -79,13 +79,13 @@ public void setJobContext(JobContext jobContext) { @Override public DorisSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new DorisSinkWriter( - context, Collections.emptyList(), catalogTable, dorisConfig, jobId); + context, Collections.emptyList(), catalogTable, dorisSinkConfig, jobId); } @Override public SinkWriter restoreWriter( SinkWriter.Context context, List states) throws IOException { - return new DorisSinkWriter(context, states, catalogTable, dorisConfig, jobId); + return new DorisSinkWriter(context, states, catalogTable, dorisSinkConfig, jobId); } @Override @@ -95,7 +95,7 @@ public Optional> getWriterStateSerializer() { @Override public Optional> createCommitter() throws IOException { - return Optional.of(new DorisCommitter(dorisConfig)); + return Optional.of(new DorisCommitter(dorisSinkConfig)); } @Override @@ -127,11 +127,11 @@ public Optional getSaveModeHandler() { Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config); return Optional.of( new DefaultSaveModeHandler( - config.get(DorisOptions.SCHEMA_SAVE_MODE), - config.get(DorisOptions.DATA_SAVE_MODE), + config.get(DorisSinkOptions.SCHEMA_SAVE_MODE), + config.get(DorisSinkOptions.DATA_SAVE_MODE), catalog, catalogTable, - config.get(DorisOptions.CUSTOM_SQL))); + config.get(DorisSinkOptions.CUSTOM_SQL))); } @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index e1849c39341..9a2ce67be27 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo; import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState; import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; @@ -39,15 +39,14 @@ import java.util.List; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; @AutoService(Factory.class) public class DorisSinkFactory implements TableSinkFactory { - public static final String IDENTIFIER = "Doris"; - @Override public String factoryIdentifier() { return IDENTIFIER; @@ -55,12 +54,12 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return DorisOptions.SINK_RULE.build(); + return DorisSinkOptions.SINK_RULE.build(); } @Override public List excludeTablePlaceholderReplaceKeys() { - return Arrays.asList(DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + return Arrays.asList(DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); } @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java index 5c6e81ba7e2..b92f2869bc9 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.doris.sink.committer; import org.apache.seatunnel.api.sink.SinkCommitter; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder; @@ -46,15 +46,15 @@ public class DorisCommitter implements SinkCommitter { private static final String COMMIT_PATTERN = "http://%s/api/%s/_stream_load_2pc"; private static final int HTTP_TEMPORARY_REDIRECT = 200; private final CloseableHttpClient httpClient; - private final DorisConfig dorisConfig; + private final DorisSinkConfig dorisSinkConfig; int maxRetry; - public DorisCommitter(DorisConfig dorisConfig) { - this(dorisConfig, new HttpUtil().getHttpClient()); + public DorisCommitter(DorisSinkConfig dorisSinkConfig) { + this(dorisSinkConfig, new HttpUtil().getHttpClient()); } - public DorisCommitter(DorisConfig dorisConfig, CloseableHttpClient client) { - this.dorisConfig = dorisConfig; + public DorisCommitter(DorisSinkConfig dorisSinkConfig, CloseableHttpClient client) { + this.dorisSinkConfig = dorisSinkConfig; this.httpClient = client; } @@ -80,11 +80,11 @@ private void commitTransaction(DorisCommitInfo committable) int retry = 0; String hostPort = committable.getHostPort(); CloseableHttpResponse response = null; - while (retry++ <= dorisConfig.getMaxRetries()) { + while (retry++ <= dorisSinkConfig.getMaxRetries()) { HttpPutBuilder putBuilder = new HttpPutBuilder(); putBuilder .setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb())) - .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword()) + .baseAuth(dorisSinkConfig.getUsername(), dorisSinkConfig.getPassword()) .addCommonHeader() .addTxnId(committable.getTxbID()) .setEmptyEntity() @@ -93,14 +93,14 @@ private void commitTransaction(DorisCommitInfo committable) response = httpClient.execute(putBuilder.build()); } catch (IOException e) { log.error("commit transaction failed: ", e); - hostPort = dorisConfig.getFrontends(); + hostPort = dorisSinkConfig.getFrontends(); continue; } statusCode = response.getStatusLine().getStatusCode(); reasonPhrase = response.getStatusLine().getReasonPhrase(); if (statusCode != HTTP_TEMPORARY_REDIRECT) { log.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); - hostPort = dorisConfig.getFrontends(); + hostPort = dorisSinkConfig.getFrontends(); } else { break; } @@ -139,7 +139,7 @@ private void abortTransaction(DorisCommitInfo committable) while (retry++ <= maxRetry) { HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb())) - .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword()) + .baseAuth(dorisSinkConfig.getUsername(), dorisSinkConfig.getPassword()) .addCommonHeader() .addTxnId(committable.getTxbID()) .setEmptyEntity() diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index b5aa5274216..f6dfae55346 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.RestService; @@ -59,7 +59,7 @@ public class DorisSinkWriter new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT)); private long lastCheckpointId; private DorisStreamLoad dorisStreamLoad; - private final DorisConfig dorisConfig; + private final DorisSinkConfig dorisSinkConfig; private final String labelPrefix; private final LabelGenerator labelGenerator; private final int intervalTime; @@ -72,41 +72,41 @@ public DorisSinkWriter( SinkWriter.Context context, List state, CatalogTable catalogTable, - DorisConfig dorisConfig, + DorisSinkConfig dorisSinkConfig, String jobId) { - this.dorisConfig = dorisConfig; + this.dorisSinkConfig = dorisSinkConfig; this.catalogTable = catalogTable; this.lastCheckpointId = !state.isEmpty() ? state.get(0).getCheckpointId() : 0; log.info("restore checkpointId {}", lastCheckpointId); - log.info("labelPrefix " + dorisConfig.getLabelPrefix()); + log.info("labelPrefix " + dorisSinkConfig.getLabelPrefix()); this.labelPrefix = - dorisConfig.getLabelPrefix() + dorisSinkConfig.getLabelPrefix() + "_" + catalogTable.getTablePath().getFullName().replaceAll("\\.", "_") + "_" + jobId + "_" + context.getIndexOfSubtask(); - this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC()); + this.labelGenerator = new LabelGenerator(labelPrefix, dorisSinkConfig.getEnable2PC()); this.scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build()); - this.serializer = createSerializer(dorisConfig, catalogTable.getSeaTunnelRowType()); - this.intervalTime = dorisConfig.getCheckInterval(); + this.serializer = createSerializer(dorisSinkConfig, catalogTable.getSeaTunnelRowType()); + this.intervalTime = dorisSinkConfig.getCheckInterval(); this.initializeLoad(); } private void initializeLoad() { - String backend = RestService.randomEndpoint(dorisConfig.getFrontends(), log); + String backend = RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log); try { this.dorisStreamLoad = new DorisStreamLoad( backend, catalogTable.getTablePath(), - dorisConfig, + dorisSinkConfig, labelGenerator, new HttpUtil().getHttpClient()); - if (dorisConfig.getEnable2PC()) { + if (dorisSinkConfig.getEnable2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); } } catch (Exception e) { @@ -124,15 +124,15 @@ public void write(SeaTunnelRow element) throws IOException { checkLoadException(); byte[] serialize = serializer.serialize( - dorisConfig.isNeedsUnsupportedTypeCasting() + dorisSinkConfig.isNeedsUnsupportedTypeCasting() ? UnsupportedTypeConverterUtils.convertRow(element) : element); if (Objects.isNull(serialize)) { return; } dorisStreamLoad.writeRecord(serialize); - if (!dorisConfig.getEnable2PC() - && dorisStreamLoad.getRecordCount() >= dorisConfig.getBatchSize()) { + if (!dorisSinkConfig.getEnable2PC() + && dorisStreamLoad.getRecordCount() >= dorisSinkConfig.getBatchSize()) { flush(); startLoad(labelGenerator.generateLabel(lastCheckpointId)); } @@ -141,7 +141,7 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { RespContent respContent = flush(); - if (!dorisConfig.getEnable2PC() || respContent == null) { + if (!dorisSinkConfig.getEnable2PC() || respContent == null) { return Optional.empty(); } long txnId = respContent.getTxnId(); @@ -178,7 +178,7 @@ private void startLoad(String label) { @Override public void abortPrepare() { - if (dorisConfig.getEnable2PC()) { + if (dorisSinkConfig.getEnable2PC()) { try { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); } catch (Exception e) { @@ -208,7 +208,7 @@ private void checkLoadException() { @Override public void close() throws IOException { - if (!dorisConfig.getEnable2PC()) { + if (!dorisSinkConfig.getEnable2PC()) { flush(); } if (scheduledExecutorService != null) { @@ -220,14 +220,14 @@ public void close() throws IOException { } private DorisSerializer createSerializer( - DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) { + DorisSinkConfig dorisSinkConfig, SeaTunnelRowType seaTunnelRowType) { return new SeaTunnelRowSerializer( - dorisConfig + dorisSinkConfig .getStreamLoadProps() .getProperty(LoadConstants.FORMAT_KEY) .toLowerCase(), seaTunnelRowType, - dorisConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY), - dorisConfig.getEnableDelete()); + dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY), + dorisSinkConfig.getEnableDelete()); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 8ec59e81ece..1e0ee7b9c2e 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.JsonUtils; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.models.RespContent; @@ -89,20 +89,20 @@ public class DorisStreamLoad implements Serializable { public DorisStreamLoad( String hostPort, TablePath tablePath, - DorisConfig dorisConfig, + DorisSinkConfig dorisSinkConfig, LabelGenerator labelGenerator, CloseableHttpClient httpClient) { this.hostPort = hostPort; this.db = tablePath.getDatabaseName(); this.table = tablePath.getTableName(); - this.user = dorisConfig.getUsername(); - this.passwd = dorisConfig.getPassword(); + this.user = dorisSinkConfig.getUsername(); + this.passwd = dorisSinkConfig.getPassword(); this.labelGenerator = labelGenerator; this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table); this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); - this.enable2PC = dorisConfig.getEnable2PC(); - this.streamLoadProp = dorisConfig.getStreamLoadProps(); - this.enableDelete = dorisConfig.getEnableDelete(); + this.enable2PC = dorisSinkConfig.getEnable2PC(); + this.streamLoadProp = dorisSinkConfig.getStreamLoadProps(); + this.enableDelete = dorisSinkConfig.getEnableDelete(); this.httpClient = httpClient; this.executorService = new ThreadPoolExecutor( @@ -113,7 +113,7 @@ public DorisStreamLoad( new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("stream-load-upload").build()); this.recordStream = - new RecordStream(dorisConfig.getBufferSize(), dorisConfig.getBufferCount()); + new RecordStream(dorisSinkConfig.getBufferSize(), dorisSinkConfig.getBufferCount()); lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes(); loadBatchFirstRecord = true; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSource.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSource.java index c04f074021a..8b5f168a2d5 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSource.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSource.java @@ -17,34 +17,36 @@ package org.apache.seatunnel.connectors.doris.source; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; import org.apache.seatunnel.connectors.doris.source.reader.DorisSourceReader; import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit; import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplitEnumerator; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j public class DorisSource implements SeaTunnelSource { private static final long serialVersionUID = 6139826339248788618L; - private final DorisConfig config; - private final CatalogTable catalogTable; + private final DorisSourceConfig config; + private final Map dorisSourceTables; - public DorisSource(ReadonlyConfig config, CatalogTable catalogTable) { - this.config = DorisConfig.of(config); - this.catalogTable = catalogTable; + public DorisSource( + DorisSourceConfig config, Map dorisSourceTables) { + this.config = config; + this.dorisSourceTables = dorisSourceTables; } @Override @@ -59,20 +61,21 @@ public Boundedness getBoundedness() { @Override public List getProducedCatalogTables() { - return Collections.singletonList(catalogTable); + return dorisSourceTables.values().stream() + .map(DorisSourceTable::getCatalogTable) + .collect(Collectors.toList()); } @Override public SourceReader createReader( SourceReader.Context readerContext) { - return new DorisSourceReader(readerContext, config, catalogTable.getSeaTunnelRowType()); + return new DorisSourceReader(readerContext, config, dorisSourceTables); } @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) { - return new DorisSourceSplitEnumerator( - enumeratorContext, config, catalogTable.getSeaTunnelRowType()); + return new DorisSourceSplitEnumerator(enumeratorContext, config, dorisSourceTables); } @Override @@ -80,6 +83,6 @@ public SourceSplitEnumerator restoreEnumerat SourceSplitEnumerator.Context enumeratorContext, DorisSourceState checkpointState) { return new DorisSourceSplitEnumerator( - enumeratorContext, config, catalogTable.getSeaTunnelRowType(), checkpointState); + enumeratorContext, config, dorisSourceTables, checkpointState); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java index 75cc266edad..506a7c97dc8 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.doris.source; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; @@ -29,8 +28,8 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; +import org.apache.seatunnel.connectors.doris.config.DorisTableConfig; import org.apache.commons.lang3.StringUtils; @@ -39,62 +38,88 @@ import java.io.Serializable; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD; +import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.TABLE_LIST; + @Slf4j @AutoService(Factory.class) public class DorisSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { - return DorisConfig.IDENTIFIER; + return IDENTIFIER; } @Override public OptionRule optionRule() { return OptionRule.builder() - .required( - DorisOptions.FENODES, - DorisOptions.USERNAME, - DorisOptions.PASSWORD, - DorisOptions.DATABASE, - DorisOptions.TABLE) - .optional(DorisOptions.DORIS_FILTER_QUERY) - .optional(DorisOptions.DORIS_READ_FIELD) - .optional(DorisOptions.QUERY_PORT) - .optional(DorisOptions.DORIS_BATCH_SIZE) + .required(FENODES, USERNAME, PASSWORD) + .optional(TABLE_LIST) + .optional(DATABASE) + .optional(TABLE) + .optional(DORIS_FILTER_QUERY) + .optional(DORIS_READ_FIELD) + .optional(QUERY_PORT) + .optional(DORIS_BATCH_SIZE) .build(); } @Override public TableSource createSource(TableSourceFactoryContext context) { - ReadonlyConfig options = context.getOptions(); - CatalogTable table; - DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory(); - DorisCatalog catalog = (DorisCatalog) dorisCatalogFactory.createCatalog("doris", options); - catalog.open(); - String tableIdentifier = - options.get(DorisOptions.DATABASE) + "." + options.get(DorisOptions.TABLE); - TablePath tablePath = TablePath.of(tableIdentifier); + DorisSourceConfig dorisSourceConfig = DorisSourceConfig.of(context.getOptions()); + List dorisTableConfigList = dorisSourceConfig.getTableConfigList(); + Map dorisSourceTables = new HashMap<>(); + for (DorisTableConfig dorisTableConfig : dorisTableConfigList) { + CatalogTable table; + DorisCatalogFactory dorisCatalogFactory = new DorisCatalogFactory(); + DorisCatalog catalog = + (DorisCatalog) dorisCatalogFactory.createCatalog("doris", context.getOptions()); + catalog.open(); + TablePath tablePath = TablePath.of(dorisTableConfig.getTableIdentifier()); + String readFields = dorisTableConfig.getReadField(); + try { + List readFiledList = null; + if (StringUtils.isNotBlank(readFields)) { + readFiledList = + Arrays.stream(readFields.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } - try { - String read_fields = options.get(DorisOptions.DORIS_READ_FIELD); - List readFiledList = null; - if (StringUtils.isNotBlank(read_fields)) { - readFiledList = - Arrays.stream(read_fields.split(",")) - .map(String::trim) - .collect(Collectors.toList()); + table = catalog.getTable(tablePath, readFiledList); + } catch (Exception e) { + log.error("create source error"); + throw e; } - - table = catalog.getTable(tablePath, readFiledList); - } catch (Exception e) { - log.error("create source error"); - throw e; + dorisSourceTables.put( + tablePath, + DorisSourceTable.builder() + .catalogTable(table) + .tablePath(tablePath) + .readField(readFields) + .filterQuery(dorisTableConfig.getFilterQuery()) + .batchSize(dorisTableConfig.getBatchSize()) + .tabletSize(dorisTableConfig.getTabletSize()) + .execMemLimit(dorisTableConfig.getExecMemLimit()) + .build()); } - CatalogTable finalTable = table; - return () -> (SeaTunnelSource) new DorisSource(options, finalTable); + return () -> + (SeaTunnelSource) + new DorisSource(dorisSourceConfig, dorisSourceTables); } @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceTable.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceTable.java new file mode 100644 index 00000000000..b09568db9ed --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceTable.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +@Data +@Builder +public class DorisSourceTable implements Serializable { + private static final long serialVersionUID = 1L; + + private final TablePath tablePath; + private String readField; + private String filterQuery; + private int batchSize; + private Integer tabletSize; + private Long execMemLimit; + private final CatalogTable catalogTable; +} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisSourceReader.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisSourceReader.java index 66c4e1f269f..ffe1d0e54a0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisSourceReader.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisSourceReader.java @@ -20,10 +20,13 @@ import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.source.DorisSourceTable; import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit; import lombok.extern.slf4j.Slf4j; @@ -32,27 +35,30 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Queue; @Slf4j public class DorisSourceReader implements SourceReader { private final Context context; - private final DorisConfig dorisConfig; + private final DorisSourceConfig dorisSourceConfig; private final Queue splitsQueue; private volatile boolean noMoreSplits; private DorisValueReader valueReader; - private SeaTunnelRowType seaTunnelRowType; + private final Map tables; public DorisSourceReader( - Context context, DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) { + Context context, + DorisSourceConfig dorisSourceConfig, + Map tables) { this.splitsQueue = new ArrayDeque<>(); this.context = context; - this.dorisConfig = dorisConfig; - this.seaTunnelRowType = seaTunnelRowType; + this.dorisSourceConfig = dorisSourceConfig; + this.tables = tables; } @Override @@ -71,7 +77,16 @@ public void pollNext(Collector output) throws Exception { DorisSourceSplit nextSplit = splitsQueue.poll(); if (nextSplit != null) { PartitionDefinition partition = nextSplit.getPartitionDefinition(); - valueReader = new DorisValueReader(partition, dorisConfig, seaTunnelRowType); + DorisSourceTable dorisSourceTable = + tables.get(TablePath.of(partition.getDatabase(), partition.getTable())); + if (dorisSourceTable == null) { + throw new DorisConnectorException( + DorisConnectorErrorCode.SHOULD_NEVER_HAPPEN, + String.format( + "the table '%s.%s' cannot be found in table_list of job configuration.", + partition.getDatabase(), partition.getTable())); + } + valueReader = new DorisValueReader(partition, dorisSourceConfig, dorisSourceTable); while (valueReader.hasNext()) { SeaTunnelRow record = valueReader.next(); output.collect(record); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java index 18d3d004d94..68d2eecfb51 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java @@ -20,11 +20,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.doris.backend.BackendClient; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; import org.apache.seatunnel.connectors.doris.rest.models.Schema; +import org.apache.seatunnel.connectors.doris.source.DorisSourceTable; import org.apache.seatunnel.connectors.doris.source.serialization.Routing; import org.apache.seatunnel.connectors.doris.source.serialization.RowBatch; import org.apache.seatunnel.connectors.doris.util.SchemaUtils; @@ -54,7 +55,8 @@ public class DorisValueReader { protected Lock clientLock = new ReentrantLock(); private PartitionDefinition partition; - private DorisConfig config; + private DorisSourceTable dorisSourceTable; + private DorisSourceConfig config; protected int offset = 0; protected AtomicBoolean eos = new AtomicBoolean(false); @@ -72,12 +74,15 @@ public class DorisValueReader { protected boolean asyncThreadStarted; public DorisValueReader( - PartitionDefinition partition, DorisConfig config, SeaTunnelRowType seaTunnelRowType) { + PartitionDefinition partition, + DorisSourceConfig config, + DorisSourceTable dorisSourceTable) { this.partition = partition; this.config = config; + this.dorisSourceTable = dorisSourceTable; this.client = backendClient(); this.deserializeArrowToRowBatchAsync = config.getDeserializeArrowAsync(); - this.seaTunnelRowType = seaTunnelRowType; + this.seaTunnelRowType = dorisSourceTable.getCatalogTable().getSeaTunnelRowType(); int blockingQueueSize = config.getDeserializeQueueSize(); if (this.deserializeArrowToRowBatchAsync) { this.rowBatchBlockingQueue = new ArrayBlockingQueue<>(blockingQueueSize); @@ -117,9 +122,9 @@ private TScanOpenParams openParams() { params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[] {})); params.opaqued_query_plan = partition.getQueryPlan(); // max row number of one read batch - Integer batchSize = config.getBatchSize(); + Integer batchSize = dorisSourceTable.getBatchSize(); Integer queryDorisTimeout = config.getRequestQueryTimeoutS(); - Long execMemLimit = config.getExecMemLimit(); + Long execMemLimit = dorisSourceTable.getExecMemLimit(); params.setBatchSize(batchSize); params.setQueryTimeout(queryDorisTimeout); params.setMemLimit(execMemLimit); @@ -250,7 +255,9 @@ public SeaTunnelRow next() { throw new DorisConnectorException( DorisConnectorErrorCode.SHOULD_NEVER_HAPPEN, "never happen error."); } - return rowBatch.next(); + SeaTunnelRow next = rowBatch.next(); + next.setTableId(dorisSourceTable.getTablePath().toString()); + return next; } public void close() { diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java index d2d2e61d7e1..1aa10a88b54 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java @@ -18,13 +18,14 @@ package org.apache.seatunnel.connectors.doris.source.split; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.seatunnel.connectors.doris.source.DorisSourceState; +import org.apache.seatunnel.connectors.doris.source.DorisSourceTable; import lombok.extern.slf4j.Slf4j; @@ -41,31 +42,31 @@ public class DorisSourceSplitEnumerator implements SourceSplitEnumerator { - private Context context; - private DorisConfig dorisConfig; + private final Context context; + private final DorisSourceConfig dorisSourceConfig; private volatile boolean shouldEnumerate; private final Map> pendingSplit; - private SeaTunnelRowType seaTunnelRowType; + private final Map dorisSourceTables; private final Object stateLock = new Object(); public DorisSourceSplitEnumerator( Context context, - DorisConfig dorisConfig, - SeaTunnelRowType seaTunnelRowType) { - this(context, dorisConfig, seaTunnelRowType, null); + DorisSourceConfig dorisSourceConfig, + Map dorisSourceTables) { + this(context, dorisSourceConfig, dorisSourceTables, null); } public DorisSourceSplitEnumerator( Context context, - DorisConfig dorisConfig, - SeaTunnelRowType rowType, + DorisSourceConfig dorisSourceConfig, + Map dorisSourceTables, DorisSourceState dorisSourceState) { this.context = context; - this.dorisConfig = dorisConfig; - this.seaTunnelRowType = rowType; + this.dorisSourceConfig = dorisSourceConfig; + this.dorisSourceTables = dorisSourceTables; this.pendingSplit = new ConcurrentHashMap<>(); this.shouldEnumerate = (dorisSourceState == null); if (dorisSourceState != null) { @@ -149,10 +150,12 @@ public void notifyCheckpointComplete(long checkpointId) {} private List getDorisSourceSplit() { List splits = new ArrayList<>(); - List partitions = - RestService.findPartitions(seaTunnelRowType, dorisConfig, log); - for (PartitionDefinition partition : partitions) { - splits.add(new DorisSourceSplit(partition, String.valueOf(partition.hashCode()))); + for (DorisSourceTable dorisSourceTable : dorisSourceTables.values()) { + List partitions = + RestService.findPartitions(dorisSourceConfig, dorisSourceTable, log); + for (PartitionDefinition partition : partitions) { + splits.add(new DorisSourceSplit(partition, String.valueOf(partition.hashCode()))); + } } return splits; } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java index e4f8804be02..53b38049f98 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.api.table.converter.TypeConverter; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate; import org.apache.commons.lang3.StringUtils; @@ -155,7 +155,7 @@ public static String getCreateTableStatement( SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(), primaryKey, tablePath.getFullName(), - DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); template = template.replaceAll( SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(), @@ -165,7 +165,7 @@ public static String getCreateTableStatement( SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(), uniqueKey, tablePath.getFullName(), - DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); template = template.replaceAll( SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey); @@ -174,7 +174,7 @@ public static String getCreateTableStatement( SaveModePlaceHolder.ROWTYPE_DUPLICATE_KEY.getPlaceHolder(), dupKey, tablePath.getFullName(), - DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); template = template.replaceAll( SaveModePlaceHolder.ROWTYPE_DUPLICATE_KEY.getReplacePlaceHolder(), dupKey); diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java index 02b3c5478f4..cdaa55487c6 100644 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java @@ -31,7 +31,7 @@ import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV1; import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil; @@ -141,7 +141,7 @@ public void test() { + "\"disable_auto_compaction\" = \"false\"\n" + ")"); - String createTemplate = DorisOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue(); + String createTemplate = DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue(); CatalogTable catalogTable = CatalogTable.of( TableIdentifier.of("test", "test1", "test2"), @@ -171,7 +171,7 @@ public void test() { SaveModePlaceHolder.getDisplay(primaryKeyHolder), createTemplate, primaryKeyHolder, - DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); Assertions.assertEquals( exceptSeaTunnelRuntimeException.getMessage(), actualSeaTunnelRuntimeException.getMessage()); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml index 7a3008adb3a..f1c9f159d5f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml @@ -56,6 +56,12 @@ test-jar test + + org.apache.seatunnel + connector-assert + ${project.version} + test + org.testcontainers diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java index 07fab009083..a2da0bd302b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java @@ -35,6 +35,8 @@ import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory; import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; +import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions; import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory; import org.apache.seatunnel.connectors.doris.source.DorisSourceFactory; @@ -224,7 +226,7 @@ void testSaveMode() { put(DorisOptions.TABLE.key(), "test4"); put(DorisOptions.USERNAME.key(), USERNAME); put(DorisOptions.PASSWORD.key(), PASSWORD); - put(DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true); + put(DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true); } }); upstreamTable @@ -282,7 +284,8 @@ public void testDorisSourceSelectFieldsNotLossKeysInformation() { put(DorisOptions.USERNAME.key(), USERNAME); put(DorisOptions.PASSWORD.key(), PASSWORD); put( - DorisOptions.DORIS_READ_FIELD.key(), + DorisSourceOptions.DORIS_READ_FIELD + .key(), "k1,k2"); put( DorisOptions.FENODES.key(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java new file mode 100644 index 00000000000..dd604b57145 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisMultiReadIT.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.doris; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +@Slf4j +public class DorisMultiReadIT extends AbstractDorisIT { + private static final String UNIQUE_TABLE_0 = "doris_e2e_unique_table_0"; + private static final String UNIQUE_TABLE_1 = "doris_e2e_unique_table_1"; + private static final String SOURCE_DB_0 = "e2e_source_0"; + private static final String SOURCE_DB_1 = "e2e_source_1"; + private static final String sinkDB = "e2e_sink"; + private Connection conn; + + private static final String INIT_UNIQUE_TABLE_DATA_SQL = + "insert into %s.%s" + + " (\n" + + " F_ID,\n" + + " F_INT,\n" + + " F_BIGINT,\n" + + " F_TINYINT,\n" + + " F_SMALLINT,\n" + + " F_DECIMAL,\n" + + " F_LARGEINT,\n" + + " F_BOOLEAN,\n" + + " F_DOUBLE,\n" + + " F_FLOAT,\n" + + " F_CHAR,\n" + + " F_VARCHAR_11,\n" + + " F_STRING,\n" + + " F_DATETIME_P,\n" + + " F_DATETIME,\n" + + " F_DATE,\n" + + " MAP_VARCHAR_BOOLEAN,\n" + + " MAP_CHAR_TINYINT,\n" + + " MAP_STRING_SMALLINT,\n" + + " MAP_INT_INT,\n" + + " MAP_TINYINT_BIGINT,\n" + + " MAP_SMALLINT_LARGEINT,\n" + + " MAP_BIGINT_FLOAT,\n" + + " MAP_LARGEINT_DOUBLE,\n" + + " MAP_STRING_DECIMAL,\n" + + " MAP_DECIMAL_DATE,\n" + + " MAP_DATE_DATETIME,\n" + + " MAP_DATETIME_CHAR,\n" + + " MAP_CHAR_VARCHAR,\n" + + " MAP_VARCHAR_STRING\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + + private final String UNIQUE_TABLE_COLUMN_STRING = + "F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_LARGEINT, F_BOOLEAN, F_DOUBLE, F_FLOAT, F_CHAR, F_VARCHAR_11, F_STRING, F_DATETIME_P, F_DATETIME, F_DATE, MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING"; + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/jdbc/lib && cd /tmp/seatunnel/plugins/jdbc/lib && wget " + + DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @TestTemplate + public void testDorisMultiRead(TestContainer container) + throws IOException, InterruptedException { + initializeJdbcTable(); + // init table_0 + batchInsertUniqueTableData(SOURCE_DB_0, UNIQUE_TABLE_0); + // init table_1 + batchInsertUniqueTableData(SOURCE_DB_1, UNIQUE_TABLE_1); + // test assert row num + Container.ExecResult execResult = + container.executeJob("/doris_multi_source_to_assert.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // execute multi read with 2pc enable + execResult = container.executeJob("/doris_multi_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + checkSinkData(SOURCE_DB_0, UNIQUE_TABLE_0, "where F_ID >= 50"); + checkSinkData(SOURCE_DB_1, UNIQUE_TABLE_1, "where F_ID < 40"); + // clean sink database data + clearSinkUniqueTable(); + // execute multi read without 2pc enable + Container.ExecResult execResult2 = + container.executeJob("/doris_multi_source_to_sink_2pc_false.conf"); + Assertions.assertEquals(0, execResult2.getExitCode()); + checkSinkData(SOURCE_DB_0, UNIQUE_TABLE_0, "where F_ID >= 50"); + checkSinkData(SOURCE_DB_1, UNIQUE_TABLE_1, "where F_ID < 40"); + // clean all data + clearSourceUniqueTable(); + clearSinkUniqueTable(); + } + + protected void checkSinkData(String database, String tableName, String sqlCondition) { + try { + assertHasData(database, tableName); + assertHasData(sinkDB, tableName); + + PreparedStatement sourcePre = + conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY); + sourcePre.setString(1, database); + sourcePre.setString(2, tableName); + ResultSet sourceResultSet = sourcePre.executeQuery(); + + PreparedStatement sinkPre = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY); + sinkPre.setString(1, sinkDB); + sinkPre.setString(2, tableName); + ResultSet sinkResultSet = sinkPre.executeQuery(); + + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + String sourceColumnType = sourceResultSet.getString("COLUMN_TYPE"); + String sinkColumnType = sinkResultSet.getString("COLUMN_TYPE"); + // because seatunnel type can not save the scale and length of the key type and + // value type in the MapType, + // so we use the longest scale on the doris sink to prevent data overflow. + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals( + "map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + if (sourceColumnType.equalsIgnoreCase("map")) { + Assertions.assertEquals("map", sinkColumnType); + continue; + } + + Assertions.assertEquals( + sourceColumnType.toUpperCase(Locale.ROOT), + sinkColumnType.toUpperCase(Locale.ROOT)); + } + } + + String sourceSql = + String.format( + "select * from %s.%s %s order by F_ID ", + database, tableName, sqlCondition); + String sinkSql = String.format("select * from %s.%s order by F_ID", sinkDB, tableName); + checkSourceAndSinkTableDate(sourceSql, sinkSql, UNIQUE_TABLE_COLUMN_STRING); + } catch (Exception e) { + throw new RuntimeException("Doris connection error", e); + } + } + + private void checkSourceAndSinkTableDate(String sourceSql, String sinkSql, String columnsString) + throws Exception { + List columnList = + Arrays.stream(columnsString.split(",")) + .map(x -> x.trim()) + .collect(Collectors.toList()); + Statement sourceStatement = + conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); + Statement sinkStatement = + conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals( + sourceResultSet.getMetaData().getColumnCount(), + sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columnList) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + // source read map will create map in doris + // sink, because seatunnel type can not save the scale in MapType + // so we use the longest scale on the doris sink to prevent data overflow. + String sinkStr = sink.toString().replaceAll(".000000", ""); + Assertions.assertEquals(source, sinkStr); + } + } + } + } + // Check the row numbers is equal + sourceResultSet.last(); + sinkResultSet.last(); + Assertions.assertEquals(sourceResultSet.getRow(), sinkResultSet.getRow()); + } + + private Integer tableCount(String db, String table) { + try (Statement statement = conn.createStatement()) { + String sql = String.format("select count(*) from %s.%s", db, table); + ResultSet source = statement.executeQuery(sql); + if (source.next()) { + int rowCount = source.getInt(1); + return rowCount; + } + } catch (Exception e) { + throw new RuntimeException("Failed to check data in Doris server", e); + } + return -1; + } + + private void assertHasData(String db, String table) { + try (Statement statement = conn.createStatement()) { + String sql = String.format("select * from %s.%s limit 1", db, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (Exception e) { + throw new RuntimeException("test doris server image error", e); + } + } + + private void clearSourceUniqueTable() { + try (Statement statement = conn.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s.%s", SOURCE_DB_0, UNIQUE_TABLE_0)); + statement.execute(String.format("TRUNCATE TABLE %s.%s", SOURCE_DB_1, UNIQUE_TABLE_1)); + } catch (SQLException e) { + throw new RuntimeException("test doris server image error", e); + } + } + + private void clearSinkUniqueTable() { + try (Statement statement = conn.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s.%s", sinkDB, UNIQUE_TABLE_0)); + statement.execute(String.format("TRUNCATE TABLE %s.%s", sinkDB, UNIQUE_TABLE_1)); + } catch (SQLException e) { + throw new RuntimeException("test doris server image error", e); + } + } + + protected void initializeJdbcTable() { + try { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, + DorisMultiReadIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + conn = driver.connect(String.format(URL, container.getHost()), props); + try (Statement statement = conn.createStatement()) { + // create test databases + statement.execute(createDatabase(SOURCE_DB_0)); + statement.execute(createDatabase(SOURCE_DB_1)); + statement.execute(createDatabase(sinkDB)); + log.info("create source and sink database succeed"); + // create source and sink table + statement.execute(createUniqueTableForTest(SOURCE_DB_0, UNIQUE_TABLE_0)); + statement.execute(createUniqueTableForTest(SOURCE_DB_1, UNIQUE_TABLE_1)); + statement.execute(createUniqueTableForTest(sinkDB, UNIQUE_TABLE_0)); + statement.execute(createUniqueTableForTest(sinkDB, UNIQUE_TABLE_1)); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } catch (Exception e) { + throw new RuntimeException("Initializing jdbc failed!", e); + } + } + + private String createDatabase(String db) { + return String.format("CREATE DATABASE IF NOT EXISTS %s ;", db); + } + + private String createUniqueTableForTest(String db, String table) { + String createTableSql = + "create table if not exists `%s`.`%s`(\n" + + "F_ID bigint null,\n" + + "F_INT int null,\n" + + "F_BIGINT bigint null,\n" + + "F_TINYINT tinyint null,\n" + + "F_SMALLINT smallint null,\n" + + "F_DECIMAL decimal(18,6) null,\n" + + "F_LARGEINT largeint null,\n" + + "F_BOOLEAN boolean null,\n" + + "F_DOUBLE double null,\n" + + "F_FLOAT float null,\n" + + "F_CHAR char null,\n" + + "F_VARCHAR_11 varchar(11) null,\n" + + "F_STRING string null,\n" + + "F_DATETIME_P datetime(6),\n" + + "F_DATETIME datetime,\n" + + "F_DATE date,\n" + + "MAP_VARCHAR_BOOLEAN map,\n" + + "MAP_CHAR_TINYINT MAP,\n" + + "MAP_STRING_SMALLINT MAP,\n" + + "MAP_INT_INT MAP,\n" + + "MAP_TINYINT_BIGINT MAP,\n" + + "MAP_SMALLINT_LARGEINT MAP,\n" + + "MAP_BIGINT_FLOAT MAP,\n" + + "MAP_LARGEINT_DOUBLE MAP,\n" + + "MAP_STRING_DECIMAL MAP,\n" + + "MAP_DECIMAL_DATE MAP,\n" + + "MAP_DATE_DATETIME MAP,\n" + + "MAP_DATETIME_CHAR MAP,\n" + + "MAP_CHAR_VARCHAR MAP,\n" + + "MAP_VARCHAR_STRING MAP\n" + + ")\n" + + "UNIQUE KEY(`F_ID`)\n" + + "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n" + + "properties(\n" + + "\"replication_allocation\" = \"tag.location.default: 1\"" + + ");"; + return String.format(createTableSql, db, table); + } + + protected void batchInsertUniqueTableData(String database, String tableName) { + List rows = genUniqueTableTestData(100L); + try { + conn.setAutoCommit(false); + try (PreparedStatement preparedStatement = + conn.prepareStatement( + String.format(INIT_UNIQUE_TABLE_DATA_SQL, database, tableName))) { + for (SeaTunnelRow row : rows) { + for (int index = 0; index < row.getFields().length; index++) { + preparedStatement.setObject(index + 1, row.getFields()[index]); + } + preparedStatement.addBatch(); + } + preparedStatement.executeBatch(); + } + conn.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + String message = ExceptionUtils.getMessage(exception); + getErrorUrl(message); + throw new RuntimeException("get connection error", exception); + } + log.info("insert data succeed"); + } + + private List genUniqueTableTestData(Long nums) { + List datas = new ArrayList<>(); + Map varcharBooleanMap = new HashMap<>(); + varcharBooleanMap.put("aa", true); + + Map charTinyintMap = new HashMap<>(); + charTinyintMap.put("a", (byte) 1); + + Map stringSmallintMap = new HashMap<>(); + stringSmallintMap.put("aa", Short.valueOf("1")); + + Map intIntMap = new HashMap<>(); + intIntMap.put(1, 1); + + Map tinyintBigintMap = new HashMap<>(); + tinyintBigintMap.put((byte) 1, 1L); + + Map smallintLargeintMap = new HashMap<>(); + smallintLargeintMap.put(Short.valueOf("1"), Long.valueOf("11")); + + Map bigintFloatMap = new HashMap<>(); + bigintFloatMap.put(Long.valueOf("1"), Float.valueOf("11.1")); + + Map largeintDoubtMap = new HashMap<>(); + largeintDoubtMap.put(11L, Double.valueOf("11.1")); + + String stringDecimalMap = "{\"11\":\"10.2\"}"; + + String decimalDateMap = "{\"10.02\":\"2020-02-01\"}"; + + String dateDatetimeMap = "{\"2020-02-01\":\"2020-02-01 12:00:00\"}"; + + String datetimeCharMap = "{\"2020-02-01 12:00:00\":\"1\"}"; + + String charVarcharMap = "{\"1\":\"11\"}"; + + String varcharStringMap = "{\"11\":\"11\"}"; + for (int i = 0; i < nums; i++) { + datas.add( + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + GenerateTestData.genInt(), + GenerateTestData.genBigint(), + GenerateTestData.genTinyint(), + GenerateTestData.genSmallint(), + GenerateTestData.genBigDecimal(18, 6), + GenerateTestData.genBigInteger(126), + GenerateTestData.genBoolean(), + GenerateTestData.genDouble(), + GenerateTestData.genFloat(0, 1000), + GenerateTestData.genString(1), + GenerateTestData.genString(11), + GenerateTestData.genString(12), + GenerateTestData.genDatetimeString(false), + GenerateTestData.genDatetimeString(true), + GenerateTestData.genDateString(), + JsonUtils.toJsonString(varcharBooleanMap), + JsonUtils.toJsonString(charTinyintMap), + JsonUtils.toJsonString(stringSmallintMap), + JsonUtils.toJsonString(intIntMap), + JsonUtils.toJsonString(tinyintBigintMap), + JsonUtils.toJsonString(smallintLargeintMap), + JsonUtils.toJsonString(bigintFloatMap), + JsonUtils.toJsonString(largeintDoubtMap), + stringDecimalMap, + decimalDateMap, + dateDatetimeMap, + datetimeCharMap, + charVarcharMap, + varcharStringMap + })); + } + log.info("generate test data succeed"); + return datas; + } + + @AfterAll + public void close() throws SQLException { + if (conn != null) { + conn.close(); + } + } + + public void getErrorUrl(String message) { + // 使用正则表达式匹配URL + Pattern pattern = Pattern.compile("http://[\\w./?=&-_]+"); + Matcher matcher = pattern.matcher(message); + String urlString = null; + if (matcher.find()) { + log.error("Found URL: " + matcher.group()); + urlString = matcher.group(); + } else { + log.error("No URL found."); + return; + } + + try { + URL url = new URL(urlString); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + + // 设置请求方法 + connection.setRequestMethod("GET"); + + // 设置连接超时时间 + connection.setConnectTimeout(5000); + // 设置读取超时时间 + connection.setReadTimeout(5000); + + int responseCode = connection.getResponseCode(); + + if (responseCode == HttpURLConnection.HTTP_OK) { + BufferedReader in = + new BufferedReader(new InputStreamReader(connection.getInputStream())); + String inputLine; + StringBuilder response = new StringBuilder(); + + while ((inputLine = in.readLine()) != null) { + response.append(inputLine); + } + in.close(); + } else { + log.error("GET request not worked"); + } + } catch (Exception e) { + log.error(ExceptionUtils.getMessage(e)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_assert.conf new file mode 100644 index 00000000000..d067671c8ee --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_assert.conf @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table_list = [ + { + database = "e2e_source_0" + table = "doris_e2e_unique_table_0" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID >= 50" + }, + { + database = "e2e_source_1" + table = "doris_e2e_unique_table_1" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID < 40" + } + ] + } +} + +transform {} + +sink { + Assert { + rules = { + tables_configs = [ + { + table_path = "e2e_source_0.doris_e2e_unique_table_0" + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 50 + }, + { + rule_type = MIN_ROW + rule_value = 50 + } + ] + }, + { + table_path = "e2e_source_1.doris_e2e_unique_table_1" + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 40 + }, + { + rule_type = MIN_ROW + rule_value = 40 + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink.conf new file mode 100644 index 00000000000..df36da514ad --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table_list = [ + { + database = "e2e_source_0" + table = "doris_e2e_unique_table_0" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID >= 50" + }, + { + database = "e2e_source_1" + table = "doris_e2e_unique_table_1" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID < 40" + } + ] + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + schema_save_mode = "RECREATE_SCHEMA" + username = root + password = "" + database = "e2e_sink" + table = "${table_name}" + sink.enable-2pc = "true" + sink.label-prefix = "test_json" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink_2pc_false.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink_2pc_false.conf new file mode 100644 index 00000000000..d17a90b0181 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_multi_source_to_sink_2pc_false.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + table_list = [ + { + database = "e2e_source_0" + table = "doris_e2e_unique_table_0" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID >= 50" + }, + { + database = "e2e_source_1" + table = "doris_e2e_unique_table_1" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE,MAP_VARCHAR_BOOLEAN, MAP_CHAR_TINYINT, MAP_STRING_SMALLINT, MAP_INT_INT, MAP_TINYINT_BIGINT, MAP_SMALLINT_LARGEINT, MAP_BIGINT_FLOAT, MAP_LARGEINT_DOUBLE, MAP_STRING_DECIMAL, MAP_DECIMAL_DATE, MAP_DATE_DATETIME, MAP_DATETIME_CHAR, MAP_CHAR_VARCHAR, MAP_VARCHAR_STRING" + doris.filter.query = "F_ID < 40" + } + ] + } +} + +transform {} + +sink{ + Doris { + fenodes = "doris_e2e:8030" + schema_save_mode = "RECREATE_SCHEMA" + username = root + password = "" + database = "e2e_sink" + table = "${table_name}" + sink.enable-2pc = "false" + sink.label-prefix = "test_json" + doris.config = { + format="json" + read_json_by_line="true" + } + } +} \ No newline at end of file