diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index e1816736233..74df16fe85d 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -161,9 +161,11 @@ problems encountered by users.
## Doris Connector Error Codes
-| code | description | solution |
-|----------|----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
-| Doris-01 | Writing records to Doris failed. | When users encounter this error code, it means that writing records to Doris failed, please check data from files whether is correct |
+| code | description | solution |
+|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------|
+| Doris-01 | stream load error. | When users encounter this error code, it means that stream load to Doris failed, please check data from files whether is correct. |
+| Doris-02 | commit error. | When users encounter this error code, it means that commit to Doris failed, please check network. |
+| Doris-03 | rest service error. | When users encounter this error code, it means that rest service failed, please check network and config. |
## SelectDB Cloud Connector Error Codes
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index 47e893b22b5..acffad6a7ca 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -7,31 +7,37 @@
Used to send data to Doris. Both support streaming and batch mode.
The internal implementation of Doris sink connector is cached and imported by stream load in batches.
+:::tip
+
+Version Supported
+
+* exactly-once & cdc supported `Doris version is >= 1.1.x`
+* Array data type supported `Doris version is >= 1.2.x`
+* Map data type will be support in `Doris version is 2.x`
+
+:::
+
## Key features
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
## Options
-| name | type | required | default value |
-|-----------------------------|--------|----------|-----------------|
-| node_urls | list | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| database | string | yes | - |
-| table | string | yes | - |
-| labelPrefix | string | no | - |
-| batch_max_rows | long | no | 1024 |
-| batch_max_bytes | int | no | 5 * 1024 * 1024 |
-| batch_interval_ms | int | no | 1000 |
-| max_retries | int | no | 1 |
-| retry_backoff_multiplier_ms | int | no | - |
-| max_retry_backoff_ms | int | no | - |
-| doris.config | map | no | - |
-
-### node_urls [list]
-
-`Doris` cluster address, the format is `["fe_ip:fe_http_port", ...]`
+| name | type | required | default value |
+|--------------------|--------|----------|---------------|
+| fenodes | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| table.identifier | string | yes | - |
+| sink.label-prefix | string | yes | - |
+| sink.enable-2pc | bool | no | true |
+| sink.enable-delete | bool | no | false |
+| doris.config | map | yes | - |
+
+### fenodes [string]
+
+`Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."`
### username [string]
@@ -41,47 +47,29 @@ The internal implementation of Doris sink connector is cached and imported by st
`Doris` user password
-### database [string]
-
-The name of `Doris` database
-
-### table [string]
+### table.identifier [string]
The name of `Doris` table
-### labelPrefix [string]
-
-The prefix of `Doris` stream load label
+### sink.label-prefix [string]
-### batch_max_rows [long]
+The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
+### sink.enable-2pc [bool]
-### batch_max_bytes [int]
+Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
+### sink.enable-delete [bool]
-### batch_interval_ms [int]
+Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
-
-### max_retries [int]
-
-The number of retries to flush failed
-
-### retry_backoff_multiplier_ms [int]
-
-Using as a multiplier for generating the next delay for backoff
-
-### max_retry_backoff_ms [int]
-
-The amount of time to wait before attempting to retry a request to `Doris`
+https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual
### doris.config [map]
The parameter of the stream load `data_desc`, you can get more detail at this link:
-https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
+https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD
#### Supported import data formats
@@ -94,15 +82,15 @@ Use JSON format to import data
```
sink {
Doris {
- nodeUrls = ["e2e_dorisdb:8030"]
+ fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
- database = "test"
- table = "e2e_table_sink"
- batch_max_rows = 100
+ table.identifier = "test.e2e_table_sink"
+ sink.enable-2pc = "true"
+ sink.label-prefix = "test_json"
doris.config = {
- format = "JSON"
- strip_outer_array = true
+ format="json"
+ read_json_by_line="true"
}
}
}
@@ -114,16 +102,14 @@ Use CSV format to import data
```
sink {
Doris {
- nodeUrls = ["e2e_dorisdb:8030"]
+ fenodes = ["e2e_dorisdb:8030"]
username = root
password = ""
- database = "test"
- table = "e2e_table_sink"
- batch_max_rows = 100
- sink.properties.format = "CSV"
- sink.properties.column_separator = ","
+ table.identifier = "test.e2e_table_sink"
+ sink.enable-2pc = "true"
+ sink.label-prefix = "test_csv"
doris.config = {
- format = "CSV"
+ format = "csv"
column_separator = ","
}
}
@@ -140,3 +126,10 @@ sink {
- [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)
+- [Improve] Refactor some Doris Sink code as well as support 2pc and cdc [4235](https://github.com/apache/incubator-seatunnel/pull/4235)
+
+:::tip
+
+PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector
+
+:::
diff --git a/release-note.md b/release-note.md
index dcf38096c72..36a7d376253 100644
--- a/release-note.md
+++ b/release-note.md
@@ -44,6 +44,7 @@
- [File] Support column projection #4105
- [Github] Add github source connector #4155
- [Jdbc] Add database field to sink config #4199
+- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
### Zeta Engine
- [Chore] Remove unnecessary dependencies #3795
- [Core] Improve job restart of all node down #3784
diff --git a/seatunnel-connectors-v2/connector-doris/pom.xml b/seatunnel-connectors-v2/connector-doris/pom.xml
index f0655067427..b40a849255b 100644
--- a/seatunnel-connectors-v2/connector-doris/pom.xml
+++ b/seatunnel-connectors-v2/connector-doris/pom.xml
@@ -65,5 +65,10 @@
seatunnel-format-text
${project.version}
+
+ commons-io
+ commons-io
+ ${commons-io.version}
+
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java
deleted file mode 100644
index bccc7da9ae0..00000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-public class DorisSinkManager {
-
- private final SinkConfig sinkConfig;
- private final List batchList;
-
- private final DorisStreamLoadVisitor dorisStreamLoadVisitor;
- private ScheduledExecutorService scheduler;
- private ScheduledFuture> scheduledFuture;
- private volatile boolean initialize;
- private volatile Exception flushException;
- private int batchRowCount = 0;
- private long batchBytesSize = 0;
-
- private final Integer batchIntervalMs;
-
- public DorisSinkManager(SinkConfig sinkConfig, List fileNames) {
- this.sinkConfig = sinkConfig;
- this.batchList = new ArrayList<>();
- this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
- dorisStreamLoadVisitor = new DorisStreamLoadVisitor(sinkConfig, fileNames);
- }
-
- private void tryInit() throws IOException {
- if (initialize) {
- return;
- }
- initialize = true;
-
- scheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("Doris-sink-output-%s").build());
- scheduledFuture =
- scheduler.scheduleAtFixedRate(
- () -> {
- try {
- flush();
- } catch (IOException e) {
- flushException = e;
- }
- },
- batchIntervalMs,
- batchIntervalMs,
- TimeUnit.MILLISECONDS);
- }
-
- public synchronized void write(String record) throws IOException {
- tryInit();
- checkFlushException();
- byte[] bts = record.getBytes(StandardCharsets.UTF_8);
- batchList.add(bts);
- batchRowCount++;
- batchBytesSize += bts.length;
- if (batchRowCount >= sinkConfig.getBatchMaxSize()
- || batchBytesSize >= sinkConfig.getBatchMaxBytes()) {
- flush();
- }
- }
-
- public synchronized void close() throws IOException {
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduler.shutdown();
- }
-
- flush();
- }
-
- public synchronized void flush() throws IOException {
- checkFlushException();
- if (batchList.isEmpty()) {
- return;
- }
- String label = createBatchLabel();
- DorisFlushTuple tuple = new DorisFlushTuple(label, batchBytesSize, batchList);
- for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
- try {
- Boolean successFlag = dorisStreamLoadVisitor.doStreamLoad(tuple);
- if (successFlag) {
- break;
- }
- } catch (Exception e) {
- log.warn("Writing records to Doris failed, retry times = {}", i, e);
- if (i >= sinkConfig.getMaxRetries()) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.WRITE_RECORDS_FAILED,
- "The number of retries was exceeded,writing records to Doris failed.",
- e);
- }
-
- if (e instanceof DorisConnectorException
- && ((DorisConnectorException) e).needReCreateLabel()) {
- String newLabel = createBatchLabel();
- log.warn(
- String.format(
- "Batch label changed from [%s] to [%s]",
- tuple.getLabel(), newLabel));
- tuple.setLabel(newLabel);
- }
-
- try {
- long backoff =
- Math.min(
- sinkConfig.getRetryBackoffMultiplierMs() * i,
- sinkConfig.getMaxRetryBackoffMs());
- Thread.sleep(backoff);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- "Unable to flush, interrupted while doing another attempt.",
- e);
- }
- }
- }
- batchList.clear();
- batchRowCount = 0;
- batchBytesSize = 0;
- }
-
- private void checkFlushException() {
- if (flushException != null) {
- throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, flushException);
- }
- }
-
- public String createBatchLabel() {
- String labelPrefix = "";
- if (!Strings.isNullOrEmpty(sinkConfig.getLabelPrefix())) {
- labelPrefix = sinkConfig.getLabelPrefix();
- }
- return String.format("%s%s", labelPrefix, UUID.randomUUID().toString());
- }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java
deleted file mode 100644
index 5535bee5d0f..00000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil;
-
-import org.apache.commons.codec.binary.Base64;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class DorisStreamLoadVisitor {
- private final HttpHelper httpHelper = new HttpHelper();
- private static final int MAX_SLEEP_TIME = 5;
-
- private final SinkConfig sinkConfig;
- private long pos;
- private static final String RESULT_FAILED = "Fail";
- private static final String RESULT_SUCCESS = "Success";
- private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
- private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
- private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
- private static final String RESULT_LABEL_PREPARE = "PREPARE";
- private static final String RESULT_LABEL_ABORTED = "ABORTED";
- private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
-
- private List fieldNames;
-
- public DorisStreamLoadVisitor(SinkConfig sinkConfig, List fieldNames) {
- this.sinkConfig = sinkConfig;
- this.fieldNames = fieldNames;
- }
-
- public Boolean doStreamLoad(DorisFlushTuple flushData) throws IOException {
- String host = getAvailableHost();
- if (null == host) {
- throw new DorisConnectorException(
- CommonErrorCode.ILLEGAL_ARGUMENT,
- "None of the host in `load_url` could be connected.");
- }
- String loadUrl =
- String.format(
- "%s/api/%s/%s/_stream_load",
- host, sinkConfig.getDatabase(), sinkConfig.getTable());
- if (log.isDebugEnabled()) {
- log.debug(
- String.format(
- "Start to join batch data: rows[%d] bytes[%d] label[%s].",
- flushData.getRows().size(),
- flushData.getBytes(),
- flushData.getLabel()));
- }
- Map loadResult =
- httpHelper.doHttpPut(
- loadUrl,
- joinRows(flushData.getRows(), flushData.getBytes().intValue()),
- getStreamLoadHttpHeader(flushData.getLabel()));
- final String keyStatus = "Status";
- if (null == loadResult || !loadResult.containsKey(keyStatus)) {
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- "Unable to flush data to Doris: unknown result status. " + loadResult);
- }
- if (log.isDebugEnabled()) {
- log.debug(
- String.format("StreamLoad response:\n%s"), JsonUtils.toJsonString(loadResult));
- }
- if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
- String errorMsg = "Failed to flush data to Doris.\n";
- String message = "";
- if (loadResult.containsKey("Message")) {
- message = loadResult.get("Message") + "\n";
- }
- String errorURL = "";
- if (loadResult.containsKey("ErrorURL")) {
- try {
- errorURL = httpHelper.doHttpGet(loadResult.get("ErrorURL").toString()) + "\n";
- } catch (IOException e) {
- log.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e);
- }
- } else {
- errorURL = JsonUtils.toJsonString(loadResult) + "\n";
- }
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- String.format("%s%s%s", errorMsg, message, errorURL));
- } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
- log.debug(
- String.format("StreamLoad response:\n%s"), JsonUtils.toJsonString(loadResult));
- // has to block-checking the state to get the final result
- checkLabelState(host, flushData.getLabel());
- }
- return RESULT_SUCCESS.equals(loadResult.get(keyStatus));
- }
-
- private String getAvailableHost() {
- List hostList = sinkConfig.getNodeUrls();
- long tmp = pos + hostList.size();
- for (; pos < tmp; pos++) {
- String host = String.format("http://%s", hostList.get((int) (pos % hostList.size())));
- if (httpHelper.tryHttpConnection(host)) {
- return host;
- }
- }
- return null;
- }
-
- private byte[] joinRows(List rows, int totalBytes) {
- if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
- Map props = sinkConfig.getStreamLoadProps();
- byte[] lineDelimiter =
- DelimiterParserUtil.parse((String) props.get("row_delimiter"), "\n")
- .getBytes(StandardCharsets.UTF_8);
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
- for (byte[] row : rows) {
- bos.put(row);
- bos.put(lineDelimiter);
- }
- return bos.array();
- }
-
- if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
- ByteBuffer bos =
- ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
- bos.put("[".getBytes(StandardCharsets.UTF_8));
- byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
- boolean isFirstElement = true;
- for (byte[] row : rows) {
- if (!isFirstElement) {
- bos.put(jsonDelimiter);
- }
- bos.put(row);
- isFirstElement = false;
- }
- bos.put("]".getBytes(StandardCharsets.UTF_8));
- return bos.array();
- }
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- "Failed to join rows data, unsupported `format` from stream load properties:");
- }
-
- @SuppressWarnings("unchecked")
- private void checkLabelState(String host, String label) throws IOException {
- int idx = 0;
- while (true) {
- try {
- TimeUnit.SECONDS.sleep(Math.min(++idx, MAX_SLEEP_TIME));
- } catch (InterruptedException ex) {
- break;
- }
- try {
- String queryLoadStateUrl =
- String.format(
- "%s/api/%s/get_load_state?label=%s",
- host, sinkConfig.getDatabase(), label);
- Map result =
- httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label));
- if (result == null) {
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- String.format(
- "Failed to flush data to Doris, Error "
- + "could not get the final state of label[%s].\n",
- label),
- null);
- }
- String labelState = (String) result.get("state");
- if (null == labelState) {
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- String.format(
- "Failed to flush data to Doris, Error "
- + "could not get the final state of label[%s]. response[%s]\n",
- label, JsonUtils.toJsonString(result)),
- null);
- }
- log.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
- switch (labelState) {
- case LAEBL_STATE_VISIBLE:
- case LAEBL_STATE_COMMITTED:
- return;
- case RESULT_LABEL_PREPARE:
- continue;
- case RESULT_LABEL_ABORTED:
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- String.format(
- "Failed to flush data to Doris, Error "
- + "label[%s] state[%s]\n",
- label, labelState),
- true);
- case RESULT_LABEL_UNKNOWN:
- default:
- throw new DorisConnectorException(
- CommonErrorCode.FLUSH_DATA_FAILED,
- String.format(
- "Failed to flush data to Doris, Error "
- + "label[%s] state[%s]\n",
- label, labelState));
- }
- } catch (IOException e) {
- throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, e);
- }
- }
- }
-
- private String getBasicAuthHeader(String username, String password) {
- String auth = username + ":" + password;
- byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
- return String.format("Basic %s", new String(encodedAuth));
- }
-
- private Map getStreamLoadHttpHeader(String label) {
- Map headerMap = new HashMap<>();
- if (null != fieldNames
- && !fieldNames.isEmpty()
- && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
- headerMap.put(
- "columns",
- String.join(
- ",",
- fieldNames.stream()
- .map(f -> String.format("`%s`", f))
- .collect(Collectors.toList())));
- }
- if (null != sinkConfig.getStreamLoadProps()) {
- for (Map.Entry entry : sinkConfig.getStreamLoadProps().entrySet()) {
- headerMap.put(entry.getKey(), String.valueOf(entry.getValue()));
- }
- }
- headerMap.put("strip_outer_array", "true");
- headerMap.put("Expect", "100-continue");
- headerMap.put("label", label);
- headerMap.put("Content-Type", "application/x-www-form-urlencoded");
- headerMap.put(
- "Authorization",
- getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
- return headerMap;
- }
-
- private Map getLoadStateHttpHeader(String label) {
- Map headerMap = new HashMap<>();
- headerMap.put(
- "Authorization",
- getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
- headerMap.put("Connection", "close");
- return headerMap;
- }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java
deleted file mode 100644
index bf39f2c7c45..00000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.utils.JsonUtils;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpRequestInterceptor;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultRedirectStrategy;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.util.EntityUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-
-@Slf4j
-public class HttpHelper {
- private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
-
- public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
- int code = resp.getStatusLine().getStatusCode();
- if (HttpStatus.SC_OK != code) {
- log.warn("Request failed with code:{}", code);
- return null;
- }
- HttpEntity respEntity = resp.getEntity();
- if (null == respEntity) {
- log.warn("Request failed with empty response.");
- return null;
- }
- return respEntity;
- }
-
- public String doHttpGet(String getUrl) throws IOException {
- log.info("Executing GET from {}.", getUrl);
- try (CloseableHttpClient httpclient = buildHttpClient()) {
- HttpGet httpGet = new HttpGet(getUrl);
- try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
- HttpEntity respEntity = resp.getEntity();
- if (null == respEntity) {
- log.warn("Request failed with empty response.");
- return null;
- }
- return EntityUtils.toString(respEntity);
- }
- }
- }
-
- public Map doHttpGet(String getUrl, Map header)
- throws IOException {
- log.info("Executing GET from {}.", getUrl);
- try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
- HttpGet httpGet = new HttpGet(getUrl);
- if (null != header) {
- for (Map.Entry entry : header.entrySet()) {
- httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
- }
- }
- try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
- HttpEntity respEntity = getHttpEntity(resp);
- if (null == respEntity) {
- log.warn("Request failed with empty response.");
- return null;
- }
- return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- public Map doHttpPut(String url, byte[] data, Map header)
- throws IOException {
- final HttpClientBuilder httpClientBuilder =
- HttpClients.custom()
- .addInterceptorFirst(
- (HttpRequestInterceptor)
- (request, context) -> {
- // fighting org.apache.http.protocol.RequestContent's
- // ProtocolException("Content-Length header already
- // present");
- request.removeHeaders(HTTP.CONTENT_LEN);
- })
- .setRedirectStrategy(
- new DefaultRedirectStrategy() {
- @Override
- protected boolean isRedirectable(String method) {
- return true;
- }
- });
- try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
- HttpPut httpPut = new HttpPut(url);
- if (null != header) {
- for (Map.Entry entry : header.entrySet()) {
- httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
- }
- }
- httpPut.setEntity(new ByteArrayEntity(data));
- httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
- try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
- int code = resp.getStatusLine().getStatusCode();
- if (HttpStatus.SC_OK != code) {
- String errorText;
- try {
- HttpEntity respEntity = resp.getEntity();
- errorText = EntityUtils.toString(respEntity);
- } catch (Exception err) {
- errorText = "find errorText failed: " + err.getMessage();
- }
- log.warn("Request failed with code:{}, err:{}", code, errorText);
- Map errorMap = new HashMap<>();
- errorMap.put("Status", "Fail");
- errorMap.put("Message", errorText);
- return errorMap;
- }
- HttpEntity respEntity = resp.getEntity();
- if (null == respEntity) {
- log.warn("Request failed with empty response.");
- return null;
- }
- return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
- }
- }
- }
-
- private CloseableHttpClient buildHttpClient() {
- final HttpClientBuilder httpClientBuilder =
- HttpClients.custom()
- .setRedirectStrategy(
- new DefaultRedirectStrategy() {
- @Override
- protected boolean isRedirectable(String method) {
- return true;
- }
- });
- return httpClientBuilder.build();
- }
-
- public boolean tryHttpConnection(String host) {
- try {
- URL url = new URL(host);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
- co.connect();
- co.disconnect();
- return true;
- } catch (Exception e1) {
- log.warn("Failed to connect to address:{}", host, e1);
- return false;
- }
- }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
new file mode 100644
index 00000000000..d58ea0757ad
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.Map;
+import java.util.Properties;
+
+@Setter
+@Getter
+@ToString
+public class DorisConfig {
+ public static final int DORIS_TABLET_SIZE_MIN = 1;
+ public static final int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
+ public static final int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
+ public static final int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+ private static final int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
+ public static final int DORIS_REQUEST_RETRIES_DEFAULT = 3;
+ private static final Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
+ private static final int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+ private static final int DORIS_BATCH_SIZE_DEFAULT = 1024;
+ private static final long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
+ private static final int DEFAULT_SINK_CHECK_INTERVAL = 10000;
+ private static final int DEFAULT_SINK_MAX_RETRIES = 3;
+ private static final int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
+ private static final int DEFAULT_SINK_BUFFER_COUNT = 3;
+ // common option
+ public static final Option FENODES =
+ Options.key("fenodes")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("doris fe http address.");
+
+ public static final Option TABLE_IDENTIFIER =
+ Options.key("table.identifier")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris table name.");
+ public static final Option USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris user name.");
+ public static final Option PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the doris password.");
+
+ // source config options
+ public static final Option DORIS_READ_FIELD =
+ Options.key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "List of column names in the Doris table, separated by commas");
+ public static final Option DORIS_FILTER_QUERY =
+ Options.key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+ public static final Option DORIS_TABLET_SIZE =
+ Options.key("doris.request.tablet.size")
+ .intType()
+ .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+ Options.key("doris.request.connect.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_REQUEST_READ_TIMEOUT_MS =
+ Options.key("doris.request.read.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_REQUEST_QUERY_TIMEOUT_S =
+ Options.key("doris.request.query.timeout.s")
+ .intType()
+ .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_REQUEST_RETRIES =
+ Options.key("doris.request.retries")
+ .intType()
+ .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_DESERIALIZE_ARROW_ASYNC =
+ Options.key("doris.deserialize.arrow.async")
+ .booleanType()
+ .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_DESERIALIZE_QUEUE_SIZE =
+ Options.key("doris.request.retriesdoris.deserialize.queue.size")
+ .intType()
+ .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_BATCH_SIZE =
+ Options.key("doris.batch.size")
+ .intType()
+ .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .withDescription("");
+ public static final Option DORIS_EXEC_MEM_LIMIT =
+ Options.key("doris.exec.mem.limit")
+ .longType()
+ .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+ .withDescription("");
+ public static final Option SOURCE_USE_OLD_API =
+ Options.key("source.use-old-api")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
+
+ // sink config options
+ public static final Option SINK_ENABLE_2PC =
+ Options.key("sink.enable-2pc")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("enable 2PC while loading");
+
+ public static final Option SINK_CHECK_INTERVAL =
+ Options.key("sink.check-interval")
+ .intType()
+ .defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
+ .withDescription("check exception with the interval while loading");
+ public static final Option SINK_MAX_RETRIES =
+ Options.key("sink.max-retries")
+ .intType()
+ .defaultValue(DEFAULT_SINK_MAX_RETRIES)
+ .withDescription("the max retry times if writing records to database failed.");
+ public static final Option SINK_BUFFER_SIZE =
+ Options.key("sink.buffer-size")
+ .intType()
+ .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
+ .withDescription("the buffer size to cache data for stream load.");
+ public static final Option SINK_BUFFER_COUNT =
+ Options.key("sink.buffer-count")
+ .intType()
+ .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
+ .withDescription("the buffer count to cache data for stream load.");
+ public static final Option SINK_LABEL_PREFIX =
+ Options.key("sink.label-prefix")
+ .stringType()
+ .defaultValue("")
+ .withDescription("the unique label prefix.");
+ public static final Option SINK_ENABLE_DELETE =
+ Options.key("sink.enable-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("whether to enable the delete function");
+
+ public static final Option