Skip to content

Commit

Permalink
[Feature][Doris] Support multi-table source read (#7895)
Browse files Browse the repository at this point in the history
Co-authored-by: happyboy1024 <296442618@qq.com>
  • Loading branch information
happyboy1024 and happyboy1024 authored Oct 25, 2024
1 parent 25ae492 commit 10c37ac
Show file tree
Hide file tree
Showing 35 changed files with 1,947 additions and 591 deletions.
80 changes: 65 additions & 15 deletions docs/en/connector-v2/source/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,21 @@
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)
- [x] [support multiple table read](../../concept/connector-v2-features.md)

## Description

Used to read data from Doris.
Doris Source will send a SQL to FE, FE will parse it into an execution plan, send it to BE, and BE will
directly return the data
Used to read data from Apache Doris.

## Supported DataSource Info

| Datasource | Supported versions | Driver | Url | Maven |
|------------|--------------------------------------|--------|-----|-------|
| Doris | Only Doris2.0 or later is supported. | - | - | - |

## Database Dependency

> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/'
> working directory<br/>
## Data Type Mapping

| Doris Data type | SeaTunnel Data type |
Expand All @@ -54,29 +48,40 @@ directly return the data

## Source Options

Base configuration:

| Name | Type | Required | Default | Description |
|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------|
| fenodes | string | yes | - | FE address, the format is `"fe_host:fe_http_port"` |
| username | string | yes | - | User username |
| password | string | yes | - | User password |
| doris.request.retries | int | no | 3 | Number of retries to send requests to Doris FE. |
| doris.request.read.timeout.ms | int | no | 30000 | |
| doris.request.connect.timeout.ms | int | no | 30000 | |
| query-port | string | no | 9030 | Doris QueryPort |
| doris.request.query.timeout.s | int | no | 3600 | Timeout period of Doris scan data, expressed in seconds. |
| table_list | string || - | table list |

Table list configuration:

| Name | Type | Required | Default | Description |
|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------|
| database | string | yes | - | The name of Doris database |
| table | string | yes | - | The name of Doris table |
| doris.read.field | string | no | - | Use the 'doris.read.field' parameter to select the doris table columns to read |
| query-port | string | no | 9030 | Doris QueryPort |
| doris.filter.query | string | no | - | Data filtering in doris. the format is "field = value",example : doris.filter.query = "F_ID > 2" |
| doris.batch.size | int | no | 1024 | The maximum value that can be obtained by reading Doris BE once. |
| doris.request.query.timeout.s | int | no | 3600 | Timeout period of Doris scan data, expressed in seconds. |
| doris.exec.mem.limit | long | no | 2147483648 | Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648). |
| doris.request.retries | int | no | 3 | Number of retries to send requests to Doris FE. |
| doris.request.read.timeout.ms | int | no | 30000 | |
| doris.request.connect.timeout.ms | int | no | 30000 | |

Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.

### Tips

> It is not recommended to modify advanced parameters at will
## Task Example
## Example

### single table
> This is an example of reading a Doris table and writing to Console.
```
Expand Down Expand Up @@ -159,4 +164,49 @@ sink {
Console {}
}
```
### Multiple table
```
env{
parallelism = 1
job.mode = "BATCH"
}
source{
Doris {
fenodes = "xxxx:8030"
username = root
password = ""
table_list = [
{
database = "st_source_0"
table = "doris_table_0"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
doris.filter.query = "F_ID >= 50"
},
{
database = "st_source_1"
table = "doris_table_1"
}
]
}
}
transform {}
sink{
Doris {
fenodes = "xxxx:8030"
schema_save_mode = "RECREATE_SCHEMA"
username = root
password = ""
database = "st_sink"
table = "${table_name}"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
```
212 changes: 212 additions & 0 deletions docs/zh/connector-v2/source/Doris.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Doris

> Doris 源连接器
## 支持的引擎

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 主要功能

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [ ] [精确一次](../../concept/connector-v2-features.md)
- [x] [列投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [x] [支持用户自定义分片](../../concept/connector-v2-features.md)
- [x] [支持多表读](../../concept/connector-v2-features.md)

## 描述

用于 Apache Doris 的源连接器。

## 支持的数据源信息

| 数据源 | 支持版本 | 驱动 | Url | Maven |
|------------|--------------------------------------|--------|-----|-------|
| Doris | 仅支持Doris2.0及以上版本. | - | - | - |

## 数据类型映射

| Doris 数据类型 | SeaTunnel 数据类型 |
|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
| INT | INT |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| BIGINT | BIGINT |
| LARGEINT | STRING |
| BOOLEAN | BOOLEAN |
| DECIMAL | DECIMAL((Get the designated column's specified column size)+1,<br/>(Gets the designated column's number of digits to right of the decimal point.))) |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| CHAR<br/>VARCHAR<br/>STRING<br/>TEXT | STRING |
| DATE | DATE |
| DATETIME<br/>DATETIME(p) | TIMESTAMP |
| ARRAY | ARRAY |

## 源选项

基础配置:

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------|
| fenodes | string | yes | - | FE 地址, 格式:`"fe_host:fe_http_port"` |
| username | string | yes | - | 用户名 |
| password | string | yes | - | 密码 |
| doris.request.retries | int | no | 3 | 请求Doris FE的重试次数 |
| doris.request.read.timeout.ms | int | no | 30000 | |
| doris.request.connect.timeout.ms | int | no | 30000 | |
| query-port | string | no | 9030 | Doris查询端口 |
| doris.request.query.timeout.s | int | no | 3600 | Doris扫描数据的超时时间,单位秒 |
| table_list | string || - | 表清单 |

表清单配置:

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------|
| database | string | yes | - | 数据库 |
| table | string | yes | - | 表名 |
| doris.read.field | string | no | - | 选择要读取的Doris表字段 |
| doris.filter.query | string | no | - | 数据过滤. 格式:"字段 = 值", 例如:doris.filter.query = "F_ID > 2" |
| doris.batch.size | int | no | 1024 | 每次能够从BE中读取到的最大行数 |
| doris.exec.mem.limit | long | no | 2147483648 | 单个be扫描请求可以使用的最大内存。默认内存为2G(2147483648) |

注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。

### 提示

> 不建议随意修改高级参数
## 例子

### 单表
> 这是一个从doris读取数据后,输出到控制台的例子:
```
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
```

使用`doris.read.field`参数来选择需要读取的Doris表字段:

```
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
```

使用`doris.filter.query`来过滤数据,参数值将作为过滤条件直接传递到doris:

```
env {
parallelism = 2
job.mode = "BATCH"
}
source{
Doris {
fenodes = "doris_e2e:8030"
username = root
password = ""
database = "e2e_source"
table = "doris_e2e_table"
doris.filter.query = "F_ID > 2"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
Console {}
}
```
### 多表
```
env{
parallelism = 1
job.mode = "BATCH"
}
source{
Doris {
fenodes = "xxxx:8030"
username = root
password = ""
table_list = [
{
database = "st_source_0"
table = "doris_table_0"
doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
doris.filter.query = "F_ID >= 50"
},
{
database = "st_source_1"
table = "doris_table_1"
}
]
}
}
transform {}
sink{
Doris {
fenodes = "xxxx:8030"
schema_save_mode = "RECREATE_SCHEMA"
username = root
password = ""
database = "st_sink"
table = "${table_name}"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransport;
import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransportException;

import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.source.serialization.Routing;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class BackendClient {
private final int socketTimeout;
private final int connectTimeout;

public BackendClient(Routing routing, DorisConfig readOptions) {
public BackendClient(Routing routing, DorisSourceConfig readOptions) {
this.routing = routing;
this.connectTimeout = readOptions.getRequestConnectTimeoutMs();
this.socketTimeout = readOptions.getRequestReadTimeoutMs();
Expand Down
Loading

0 comments on commit 10c37ac

Please sign in to comment.