Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][Doris]Refactor some Doris Sink code as well as support 2pc and cdc #4235

Merged
merged 21 commits into from
Mar 8, 2023
Merged
8 changes: 5 additions & 3 deletions docs/en/connector-v2/Error-Quick-Reference-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ problems encountered by users.

## Doris Connector Error Codes

| code | description | solution |
|----------|----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
| Doris-01 | Writing records to Doris failed. | When users encounter this error code, it means that writing records to Doris failed, please check data from files whether is correct |
| code | description | solution |
|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------|
| Doris-01 | stream load error. | When users encounter this error code, it means that stream load to Doris failed, please check data from files whether is correct. |
| Doris-02 | commit error. | When users encounter this error code, it means that commit to Doris failed, please check network. |
| Doris-03 | rest service error. | When users encounter this error code, it means that rest service failed, please check network and config. |

## SelectDB Cloud Connector Error Codes

Expand Down
113 changes: 53 additions & 60 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,37 @@
Used to send data to Doris. Both support streaming and batch mode.
The internal implementation of Doris sink connector is cached and imported by stream load in batches.

:::tip

Version Supported

* exactly-once & cdc supported `Doris version is >= 1.1.x`
* Array data type supported `Doris version is >= 1.2.x`
* Map data type will be support in `Doris version is 2.x`

:::

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|--------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | 1000 |
| max_retries | int | no | 1 |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| doris.config | map | no | - |

### node_urls [list]

`Doris` cluster address, the format is `["fe_ip:fe_http_port", ...]`
| name | type | required | default value |
|--------------------|--------|----------|---------------|
| fenodes | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| table.identifier | string | yes | - |
| sink.label-prefix | string | yes | - |
| sink.enable-2pc | bool | no | true |
| sink.enable-delete | bool | no | false |
| doris.config | map | yes | - |

### fenodes [string]

`Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."`

### username [string]

Expand All @@ -41,47 +47,29 @@ The internal implementation of Doris sink connector is cached and imported by st

`Doris` user password

### database [string]

The name of `Doris` database

### table [string]
### table.identifier [string]

The name of `Doris` table

### labelPrefix [string]

The prefix of `Doris` stream load label
### sink.label-prefix [string]

### batch_max_rows [long]
The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
### sink.enable-2pc [bool]

### batch_max_bytes [int]
Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
### sink.enable-delete [bool]

### batch_interval_ms [int]
Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `Doris`
https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual

### doris.config [map]

The parameter of the stream load `data_desc`, you can get more detail at this link:

https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD

#### Supported import data formats

Expand All @@ -94,15 +82,15 @@ Use JSON format to import data
```
sink {
Doris {
nodeUrls = ["e2e_dorisdb:8030"]
fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_json"
doris.config = {
format = "JSON"
strip_outer_array = true
format="json"
read_json_by_line="true"
}
}
}
Expand All @@ -114,16 +102,14 @@ Use CSV format to import data
```
sink {
Doris {
nodeUrls = ["e2e_dorisdb:8030"]
fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 100
sink.properties.format = "CSV"
sink.properties.column_separator = ","
table.identifier = "test.e2e_table_sink"
sink.enable-2pc = "true"
sink.label-prefix = "test_csv"
doris.config = {
format = "CSV"
format = "csv"
column_separator = ","
}
}
Expand All @@ -140,3 +126,10 @@ sink {

- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)

- [Improve] Refactor some Doris Sink code as well as support 2pc and cdc [4235](https://github.com/apache/incubator-seatunnel/pull/4235)

:::tip

PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector

:::
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- [File] Support column projection #4105
- [Github] Add github source connector #4155
- [Jdbc] Add database field to sink config #4199
- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
Expand Down
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-doris/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,10 @@
<artifactId>seatunnel-format-text</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
</dependencies>
</project>

This file was deleted.

Loading