diff --git a/docs/en/connector-v2/sink/IoTDB.md b/docs/en/connector-v2/sink/IoTDB.md
index e7f5bc277a6..554d0bfd06e 100644
--- a/docs/en/connector-v2/sink/IoTDB.md
+++ b/docs/en/connector-v2/sink/IoTDB.md
@@ -50,10 +50,10 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee
| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| node_urls | Array | Yes | - | `IoTDB` cluster address, the format is `["host:port", ...]` |
+| node_urls | String | Yes | - | `IoTDB` cluster address, the format is `"host1:port"` or `"host1:port,host2:port"` |
| username | String | Yes | - | `IoTDB` user username |
| password | String | Yes | - | `IoTDB` user password |
-| key_device | String | No | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
+| key_device | String | Yes | - | Specify field name of the `IoTDB` deviceId in SeaTunnelRow |
| key_timestamp | String | No | processing time | Specify field-name of the `IoTDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp |
| key_measurement_fields | Array | No | exclude `device` & `timestamp` | Specify field-name of the `IoTDB` measurement list in SeaTunnelRow. If not specified, include all fields but exclude `device` & `timestamp` |
| storage_group | Array | No | - | Specify device storage group(path prefix)
example: deviceId = ${storage_group} + "." + ${key_device} |
@@ -68,78 +68,124 @@ There is a conflict of thrift version between IoTDB and Spark.Therefore, you nee
| connection_timeout_in_ms | Integer | No | - | The maximum time (in ms) to wait when connecting to `IoTDB` |
| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |
-## Task Example
+## Examples
+
+```hocon
+env {
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ row.num = 16
+ bigint.template = [1664035200001]
+ schema = {
+ fields {
+ device_name = "string"
+ temperature = "float"
+ moisture = "int"
+ event_ts = "bigint"
+ c_string = "string"
+ c_boolean = "boolean"
+ c_tinyint = "tinyint"
+ c_smallint = "smallint"
+ c_int = "int"
+ c_bigint = "bigint"
+ c_float = "float"
+ c_double = "double"
+ }
+ }
+ }
+}
+
+...
+
+```
+
+Upstream SeaTunnelRow data format is the following:
+
+| device_name | temperature | moisture | event_ts | c_string | c_boolean | c_tinyint | c_smallint | c_int | c_bigint | c_float | c_double |
+|--------------------------|-------------|----------|---------------|----------|-----------|-----------|------------|-------|------------|---------|----------|
+| root.test_group.device_a | 36.1 | 100 | 1664035200001 | abc1 | true | 1 | 1 | 1 | 2147483648 | 1.0 | 1.0 |
+| root.test_group.device_b | 36.2 | 101 | 1664035200001 | abc2 | false | 2 | 2 | 2 | 2147483649 | 2.0 | 2.0 |
+| root.test_group.device_c | 36.3 | 102 | 1664035200001 | abc3 | false | 3 | 3 | 3 | 2147483649 | 3.0 | 3.0 |
### Case1
-Common options:
+only fill required config.
+use current processing time as timestamp. and include all fields but exclude `device` & `timestamp` as measurement fields
```hocon
sink {
IoTDB {
- node_urls = ["localhost:6667"]
+ node_urls = "localhost:6667"
username = "root"
password = "root"
- batch_size = 1024
+ key_device = "device_name" # specify the `deviceId` use device_name field
}
}
```
-When you assign `key_device` is `device_name`, for example:
+Output to `IoTDB` data format is the following:
+
+```shell
+IoTDB> SELECT * FROM root.test_group.* align by device;
++------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
+| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
++------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
+|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
+|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
+|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
++------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
+```
+
+### Case2
+
+use source event's time
```hocon
sink {
IoTDB {
- ...
- key_device = "device_name"
+ node_urls = "localhost:6667"
+ username = "root"
+ password = "root"
+ key_device = "device_name" # specify the `deviceId` use device_name field
+ key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
}
}
```
-Upstream SeaTunnelRow data format is the following:
-
-| device_name | field_1 | field_2 |
-|--------------------------|---------|---------|
-| root.test_group.device_a | 1001 | 1002 |
-| root.test_group.device_b | 2001 | 2002 |
-| root.test_group.device_c | 3001 | 3002 |
-
Output to `IoTDB` data format is the following:
```shell
IoTDB> SELECT * FROM root.test_group.* align by device;
-+------------------------+------------------------+-----------+----------+
-| Time| Device| field_1| field_2|
-+------------------------+------------------------+----------+-----------+
-|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
-|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
-|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
-+------------------------+------------------------+----------+-----------+
++------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
+| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
++------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
+|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
+|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
+|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
++------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+
```
-### Case2
+### Case3
-When you assign `key_device`、`key_timestamp`、`key_measurement_fields`, for example:
+use source event's time and limit measurement fields
```hocon
sink {
IoTDB {
- ...
+ node_urls = "localhost:6667"
+ username = "root"
+ password = "root"
key_device = "device_name"
- key_timestamp = "ts"
+ key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
```
-Upstream SeaTunnelRow data format is the following:
-
-| ts | device_name | field_1 | field_2 | temperature | moisture |
-|---------------|--------------------------|---------|---------|-------------|----------|
-| 1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
-| 1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
-| 1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |
-
Output to `IoTDB` data format is the following:
```shell
diff --git a/docs/en/connector-v2/source/IoTDB.md b/docs/en/connector-v2/source/IoTDB.md
index a20680ce638..da0f198d3e1 100644
--- a/docs/en/connector-v2/source/IoTDB.md
+++ b/docs/en/connector-v2/source/IoTDB.md
@@ -2,14 +2,16 @@
> IoTDB source connector
-## Description
+## Support Those Engines
-Read external data source data through IoTDB.
+> Spark
+> Flink
+> SeaTunnel Zeta
## Key features
- [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
@@ -18,106 +20,53 @@ supports query SQL and can achieve projection effect.
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-## Options
-
-| name | type | required | default value |
-|----------------------------|---------|----------|---------------|
-| host | string | no | - |
-| port | int | no | - |
-| node_urls | string | no | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| sql | string | yes | - |
-| schema | config | yes | - |
-| fetch_size | int | no | - |
-| lower_bound | long | no | - |
-| upper_bound | long | no | - |
-| num_partitions | int | no | - |
-| thrift_default_buffer_size | int | no | - |
-| enable_cache_leader | boolean | no | - |
-| version | string | no | - |
-| common-options | | no | - |
-
-### single node, you need to set host and port to connect to the remote data source.
-
-**host** [string] the host of the IoTDB when you select host of the IoTDB
-
-**port** [int] the port of the IoTDB when you select
-
-### multi node, you need to set node_urls to connect to the remote data source.
-
-**node_urls** [string] the node_urls of the IoTDB when you select
-
-e.g.
-
-```text
-127.0.0.1:8080,127.0.0.2:8080
-```
-
-### other parameters
-
-**sql** [string]
-execute sql statement e.g.
-
-```
-select name,age from test
-```
-
-### schema [config]
-
-#### fields [Config]
-
-The schema of the IoTDB that you want to generate
-
-e.g.
-
-```
-schema {
- fields {
- name = string
- age = int
- }
- }
-```
-
-### option parameters
-
-### fetch_size [int]
-
-the fetch_size of the IoTDB when you select
-
-### username [string]
-
-the username of the IoTDB when you select
-
-### password [string]
-
-the password of the IoTDB when you select
-
-### lower_bound [long]
-
-the lower_bound of the IoTDB when you select
-
-### upper_bound [long]
-
-the upper_bound of the IoTDB when you select
-
-### num_partitions [int]
-
-the num_partitions of the IoTDB when you select
-
-### thrift_default_buffer_size [int]
-
-the thrift_default_buffer_size of the IoTDB when you select
-
-### enable_cache_leader [boolean]
-
-enable_cache_leader of the IoTDB when you select
+## Description
-### version [string]
+Read external data source data through IoTDB.
-Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of
-0.12 when upgrading 0.13. The possible values are: V_0_12, V_0_13.
+:::tip
+
+There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.
+
+:::
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Url |
+|------------|--------------------|----------------|
+| IoTDB | `>= 0.13.0` | localhost:6667 |
+
+## Data Type Mapping
+
+| IotDB Data type | SeaTunnel Data type |
+|-----------------|---------------------|
+| BOOLEAN | BOOLEAN |
+| INT32 | TINYINT |
+| INT32 | SMALLINT |
+| INT32 | INT |
+| INT64 | BIGINT |
+| FLOAT | FLOAT |
+| DOUBLE | DOUBLE |
+| TEXT | STRING |
+
+## Source Options
+
+| Name | Type | Required | Default Value | Description |
+|----------------------------|---------|----------|---------------|------------------------------------------------------------------------------------|
+| node_urls | string | yes | - | `IoTDB` cluster address, the format is `"host1:port"` or `"host1:port,host2:port"` |
+| username | string | yes | - | `IoTDB` user username |
+| password | string | yes | - | `IoTDB` user password |
+| sql | string | yes | - | execute sql statement |
+| schema | config | yes | - | the data schema |
+| fetch_size | int | no | - | the fetch_size of the IoTDB when you select |
+| lower_bound | long | no | - | the lower_bound of the IoTDB when you select |
+| upper_bound | long | no | - | the upper_bound of the IoTDB when you select |
+| num_partitions | int | no | - | the num_partitions of the IoTDB when you select |
+| thrift_default_buffer_size | int | no | - | the thrift_default_buffer_size of the IoTDB when you select |
+| thrift_max_frame_size | int | no | - | the thrift max frame size |
+| enable_cache_leader | boolean | no | - | enable_cache_leader of the IoTDB when you select |
+| version | string | no | - | SQL semantic version used by the client, The possible values are: V_0_12, V_0_13 |
+| common-options | | no | - | |
### split partitions
@@ -157,37 +106,37 @@ Source plugin common parameters, please refer to [Source Common Options](common-
## Examples
-### Case1
-
-Common options:
-
```hocon
+env {
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+
source {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
+ sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
+ schema {
+ fields {
+ ts = timestamp
+ device_name = string
+ temperature = float
+ moisture = bigint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_string = string
+ c_boolean = boolean
+ }
+ }
}
}
-```
-
-When you assign `sql`、`fields`、`partition`, for example:
-```hocon
sink {
- IoTDB {
- ...
- sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
- lower_bound = 1
- upper_bound = 4102329600000
- num_partitions = 10
- fields {
- ts = bigint
- device_name = string
-
- temperature = float
- moisture = bigint
- }
+ Console {
}
}
```
@@ -195,23 +144,23 @@ sink {
Upstream `IoTDB` data format is the following:
```shell
-IoTDB> SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device;
-+------------------------+------------------------+--------------+-----------+
-| Time| Device| temperature| moisture|
-+------------------------+------------------------+--------------+-----------+
-|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
-|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
-|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
-+------------------------+------------------------+--------------+-----------+
+IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
++------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
+| Time| Device| temperature| moisture| c_int| c_bigint| c_float| c_double| c_string| c_boolean|
++------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
+|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1| 21474836470| 1.0f| 1.0d| abc| true|
+|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 2| 21474836470| 2.0f| 2.0d| abc| true|
+|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 3| 21474836470| 3.0f| 3.0d| abc| true|
++------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
```
Loaded to SeaTunnelRow data format is the following:
-| ts | device_name | temperature | moisture |
-|---------------|--------------------------|-------------|----------|
-| 1664035200001 | root.test_group.device_a | 36.1 | 100 |
-| 1664035200001 | root.test_group.device_b | 36.2 | 101 |
-| 1664035200001 | root.test_group.device_c | 36.3 | 102 |
+| ts | device_name | temperature | moisture | c_int | c_bigint | c_float | c_double | c_string | c_boolean |
+|---------------|--------------------------|-------------|----------|-------|-------------|---------|----------|----------|-----------|
+| 1664035200001 | root.test_group.device_a | 36.1 | 100 | 1 | 21474836470 | 1.0f | 1.0d | abc | true |
+| 1664035200001 | root.test_group.device_b | 36.2 | 101 | 2 | 21474836470 | 2.0f | 2.0d | abc | true |
+| 1664035200001 | root.test_group.device_c | 36.3 | 102 | 3 | 21474836470 | 3.0f | 3.0d | abc | true |
## Changelog
diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
index be96a7d9187..ac515ef37a9 100644
--- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
+++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
@@ -31,20 +31,6 @@ public class SourceConfig {
public static final Option SQL =
Options.key("sql").stringType().noDefaultValue().withDescription("sql");
- /*---------------------- single node configurations -------------------------*/
-
- /** The host of the IotDB server. */
- public static final Option HOST =
- Options.key("host").stringType().noDefaultValue().withDescription("host");
-
- /*
- * The port of the IotDB server.
- */
- public static final Option PORT =
- Options.key("port").intType().noDefaultValue().withDescription("port");
-
- /*---------------------- multiple node configurations -------------------------*/
-
/** Username for the source. */
public static final Option USERNAME =
Options.key("username").stringType().noDefaultValue().withDescription("usernam");
@@ -53,7 +39,7 @@ public class SourceConfig {
public static final Option PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");
- /** multiple nodes */
+ /** node urls */
public static final Option NODE_URLS =
Options.key("node_urls").stringType().noDefaultValue().withDescription("node urls");
diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index 7f2960ae007..0c171ada4fc 100644
--- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -43,9 +43,7 @@
import java.util.HashMap;
import java.util.Map;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
@AutoService(SeaTunnelSource.class)
public class IoTDBSource
@@ -66,11 +64,7 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult urlCheckResult =
- CheckConfigUtil.checkAllExists(pluginConfig, HOST.key(), PORT.key());
- if (!urlCheckResult.isSuccess()) {
- urlCheckResult = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS.key());
- }
+ CheckResult urlCheckResult = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS.key());
CheckResult schemaCheckResult =
CheckConfigUtil.checkAllExists(pluginConfig, CatalogTableUtil.SCHEMA.key());
CheckResult mergedConfigCheck =
diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
index c697df70129..2c2a521fd84 100644
--- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
@@ -30,10 +30,8 @@
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
@@ -52,8 +50,6 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(NODE_URLS, USERNAME, PASSWORD, SQL, SCHEMA)
.optional(
- HOST,
- PORT,
FETCH_SIZE,
THRIFT_DEFAULT_BUFFER_SIZE,
THRIFT_MAX_FRAME_SIZE,
diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
index c4ecd9dc81d..546487825c3 100644
--- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
+++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
@@ -46,10 +46,8 @@
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NODE_URLS;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PASSWORD;
-import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.USERNAME;
@@ -130,17 +128,10 @@ private void read(IoTDBSourceSplit split, Collector output) throws
private Session buildSession(Map conf) {
Session.Builder sessionBuilder = new Session.Builder();
- if (conf.containsKey(HOST.key())) {
- sessionBuilder
- .host((String) conf.get(HOST.key()))
- .port(Integer.parseInt(conf.get(PORT.key()).toString()))
- .build();
- } else {
- String nodeUrlsString = (String) conf.get(NODE_URLS.key());
- List nodes =
- Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
- sessionBuilder.nodeUrls(nodes);
- }
+ String nodeUrlsString = (String) conf.get(NODE_URLS.key());
+ List nodes =
+ Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
+ sessionBuilder.nodeUrls(nodes);
if (null != conf.get(FETCH_SIZE.key())) {
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE.key()).toString()));
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iotdb/IoTDBIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iotdb/IoTDBIT.java
index 94bfbe917e2..8b8d6acd77b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iotdb/IoTDBIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iotdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iotdb/IoTDBIT.java
@@ -63,7 +63,7 @@
value = {},
type = {EngineType.SPARK},
disabledReason =
- "There is a conflict of thrift version between IoTDB and Spark.Therefore. Refactor starter module, so disabled in flink")
+ "There is a conflict of thrift version between IoTDB and Spark.Therefore. Refactor starter module, so disabled in spark")
public class IoTDBIT extends TestSuiteBase implements TestResource {
private static final String IOTDB_DOCKER_IMAGE = "apache/iotdb:0.13.1-node";