diff --git a/docs/en/concept/connector-v2-features.md b/docs/en/concept/connector-v2-features.md index ad8433453fc..83b24edebf4 100644 --- a/docs/en/concept/connector-v2-features.md +++ b/docs/en/concept/connector-v2-features.md @@ -69,3 +69,7 @@ For sink connector, the sink connector supports exactly-once if any piece of dat ### cdc(change data capture) If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture). + +### support multiple table write + +Supports write multiple tables in one SeaTunnel job, users can dynamically specify the table's identifier by [configuring placeholders](./sink-options-placeholders.md). diff --git a/docs/en/concept/sink-options-placeholders.md b/docs/en/concept/sink-options-placeholders.md new file mode 100644 index 00000000000..88eada299fc --- /dev/null +++ b/docs/en/concept/sink-options-placeholders.md @@ -0,0 +1,110 @@ +# Sink Options Placeholders + +## Introduction + +The SeaTunnel provides a sink options placeholders feature that allows you to get upstream table metadata through placeholders. + +This functionality is essential when you need to dynamically get upstream table metadata (such as multi-table writes). + +This document will guide you through the usage of these placeholders and how to leverage them effectively. + +## Support Those Engines + +> SeaTunnel Zeta
+> Flink
+> Spark
+ +## Placeholder + +The placeholders are mainly controlled by the following expressions: + +- `${database_name}` + - Used to get the database in the upstream catalog table + - Default values can also be specified via expressions:`${database_name:default_my_db}` +- `${schema_name}` + - Used to get the schema in the upstream catalog table + - Default values can also be specified via expressions:`${schema_name:default_my_schema}` +- `${table_name}` + - Used to get the table in the upstream catalog table + - Default values can also be specified via expressions:`${table_name:default_my_table}` +- `${schema_full_name}` + - Used to get the schema full path(database & schema) in the upstream catalog table +- `${table_full_name}` + - Used to get the table full path(database & schema & table) in the upstream catalog table +- `${primary_key}` + - Used to get the table primary-key fields in the upstream catalog table +- `${unique_key}` + - Used to get the table unique-key fields in the upstream catalog table +- `${field_names}` + - Used to get the table field keys in the upstream catalog table + +## Configuration + +*Requires*: +- Make sure the sink connector you are using has implemented `TableSinkFactory` API + +### Example 1 + +```hocon +env { + // ignore... +} +source { + MySQL-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${database_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +### Example 2 + +```hocon +env { + // ignore... +} +source { + Oracle-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${schema_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +We will complete the placeholder replacement before the connector is started, ensuring that the sink options is ready before use. +If the variable is not replaced, it may be that the upstream table metadata is missing this option, for example: +- `mysql` source not contain `${schema_name}` +- `oracle` source not contain `${databse_name}` +- ... diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 8c6de2977b7..592cd8702be 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -18,6 +18,7 @@ - [x] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -76,7 +77,7 @@ and the default template can be modified according to the situation. Default template: ```sql -CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( +CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP @@ -93,7 +94,7 @@ DISTRIBUTED BY HASH (${rowtype_primary_key}) If a custom field is filled in the template, such as adding an `id` field ```sql -CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` +CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( id, ${rowtype_fields} @@ -323,6 +324,95 @@ sink { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "${database_name}_test" + table = "${table_name}_test" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "${schema_name}_test" + table = "${table_name}_test" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` + ## Changelog ### 2.3.0-beta 2022-10-20 diff --git a/docs/en/connector-v2/sink/Druid.md b/docs/en/connector-v2/sink/Druid.md index 0d4783b03ab..2c1a2fe25dd 100644 --- a/docs/en/connector-v2/sink/Druid.md +++ b/docs/en/connector-v2/sink/Druid.md @@ -9,6 +9,7 @@ Write data to Druid ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Data Type Mapping @@ -52,10 +53,25 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ## Example +Simple example: + +```hocon +sink { + Druid { + coordinatorUrl = "testHost:8888" + datasource = "seatunnel" + } +} +``` + +Use placeholders get upstream table metadata example: + ```hocon -Druid { - coordinatorUrl = "testHost:8888" - datasource = "seatunnel" +sink { + Druid { + coordinatorUrl = "testHost:8888" + datasource = "${table_name}_test" + } } ``` diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 023bb38ddb1..e3c62294ee6 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h ## Key features +- [x] [support multiple table write](../../concept/connector-v2-features.md) - [x] [exactly-once](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` diff --git a/docs/en/connector-v2/sink/Http.md b/docs/en/connector-v2/sink/Http.md index 1eb89af0d00..59f80514cbd 100644 --- a/docs/en/connector-v2/sink/Http.md +++ b/docs/en/connector-v2/sink/Http.md @@ -12,6 +12,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -56,6 +57,75 @@ Http { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Http { + ... + url = "http://localhost/test/${database_name}_test/${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Http { + ... + url = "http://localhost/test/${schema_name}_test/${table_name}_test" + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/sink/Hudi.md b/docs/en/connector-v2/sink/Hudi.md index 51c588e18ff..406212ca853 100644 --- a/docs/en/connector-v2/sink/Hudi.md +++ b/docs/en/connector-v2/sink/Hudi.md @@ -10,6 +10,7 @@ Used to write data to Hudi. - [x] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -76,17 +77,49 @@ Source plugin common parameters, please refer to [Source Common Options](common- ## Examples ```hocon -source { - +sink { Hudi { table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/" + table_name = "test_table" table_type = "copy_on_write" conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" use.kerberos = true kerberos.principal = "test_user@xxx" kerberos.principal.file = "/home/test/test_user.keytab" } +} +``` + +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} +transform { +} + +sink { + Hudi { + ... + table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/" + table_name = "${table_name}_test" + } } ``` diff --git a/docs/en/connector-v2/sink/Iceberg.md b/docs/en/connector-v2/sink/Iceberg.md index 3aa24a0a636..721c5ea7c08 100644 --- a/docs/en/connector-v2/sink/Iceberg.md +++ b/docs/en/connector-v2/sink/Iceberg.md @@ -16,6 +16,10 @@ Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution. +## Key features + +- [x] [support multiple table write](../../concept/connector-v2-features.md) + ## Supported DataSource Info | Datasource | Dependent | Maven | @@ -173,6 +177,77 @@ sink { ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Iceberg { + ... + namespace = "${database_name}_test" + table = "${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Iceberg { + ... + namespace = "${schema_name}_test" + table = "${table_name}_test" + } +} +``` + ## Changelog ### 2.3.4-SNAPSHOT 2024-01-18 diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md index 1dba1fbe4dc..e899840b0fa 100644 --- a/docs/en/connector-v2/sink/InfluxDB.md +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -9,6 +9,7 @@ Write data to InfluxDB. ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -100,6 +101,39 @@ sink { ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "${table_name}_test" + } +} +``` + ## Changelog ### next version diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index dd2ebba19ed..aa13c86c58f 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -25,6 +25,7 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f support `Xa transactions`. You can set `is_exactly_once=true` to enable it. - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -336,6 +337,89 @@ sink { ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + generate_sink_sql = true + + database = "${database_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + generate_sink_sql = true + + database = "${schema_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md index aa43a72522d..aea1a917fb1 100644 --- a/docs/en/connector-v2/sink/Kudu.md +++ b/docs/en/connector-v2/sink/Kudu.md @@ -16,6 +16,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Data Type Mapping @@ -123,75 +124,72 @@ sink { } ``` -### Multiple Table +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + kudu{ + kudu_masters = "kudu-master-cdc:7051" + table_name = "${database_name}_${table_name}_test" + } +} +``` + +#### example2 ```hocon env { - # You can set engine configuration here parallelism = 1 job.mode = "BATCH" } source { - FakeSource { - tables_configs = [ - { - schema = { - table = "kudu_sink_1" - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - } - ] - }, - { - schema = { - table = "kudu_sink_2" - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - } - ] + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" } ] } } +transform { +} sink { - kudu{ - kudu_masters = "kudu-master-multiple:7051" - } + kudu{ + kudu_masters = "kudu-master-cdc:7051" + table_name = "${schema_name}_${table_name}_test" + } } ``` diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index b0d41419d50..a0bb53ff1d6 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -17,6 +17,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you ## Key Features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index aef2bb11c09..f83fdcf4997 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -22,6 +22,7 @@ ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -509,7 +510,6 @@ sink { compress_codec = "lzo" } } - ``` ## Changelog diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index d79d7c9b004..58978cc20c2 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -27,6 +27,7 @@ libfb303-xxx.jar ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -242,6 +243,8 @@ sink { ### Multiple table +#### example1 + ```hocon env { parallelism = 1 @@ -254,6 +257,7 @@ source { base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" username = "root" password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] } } @@ -265,8 +269,47 @@ sink { Paimon { catalog_name="seatunnel_test" warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" - database="${database_name}" - table="${table_name}" + database="${database_name}_test" + table="${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${schema_name}_test" + table="${table_name}_test" } } ``` diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index c25975a8603..cb711f6b3b7 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -12,6 +12,7 @@ - [x] [exactly-once](../../concept/connector-v2-features.md) - [ ] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -445,45 +446,34 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden Multi-table writing and saveMode -``` +```hocon env { -"job.name"="SeaTunnel_job" -"job.mode"=STREAMING + "job.name"="SeaTunnel_job" + "job.mode"=STREAMING } source { -MySQL-CDC { - - "connect.max-retries"=3 - "connection.pool.size"=6 - "startup.mode"=INITIAL - "exactly_once"="true" - "stop.mode"=NEVER - parallelism=1 - "result_table_name"=Table11519548644512 - "dag-parsing.mode"=MULTIPLEX - catalog { - factory=Mysql - } - database-names=[ - "wls_t1" - ] - table-names=[ - "wls_t1.mysqlcdc_to_s3_t3", - "wls_t1.mysqlcdc_to_s3_t4", - "wls_t1.mysqlcdc_to_s3_t5", - "wls_t1.mysqlcdc_to_s3_t1", - "wls_t1.mysqlcdc_to_s3_t2" - ] - password="xxxxxx" - username="xxxxxxxxxxxxx" - base-url="jdbc:mysql://localhost:3306/qa_source" - server-time-zone=UTC -} + MySQL-CDC { + database-names=[ + "wls_t1" + ] + table-names=[ + "wls_t1.mysqlcdc_to_s3_t3", + "wls_t1.mysqlcdc_to_s3_t4", + "wls_t1.mysqlcdc_to_s3_t5", + "wls_t1.mysqlcdc_to_s3_t1", + "wls_t1.mysqlcdc_to_s3_t2" + ] + password="xxxxxx" + username="xxxxxxxxxxxxx" + base-url="jdbc:mysql://localhost:3306/qa_source" + } } + transform { } + sink { -S3File { + S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel/${table_name}" path="/test/${table_name}" diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index b6dc18e8eab..5fe57cd3f4e 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -12,6 +12,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -51,7 +52,7 @@ and the default template can be modified according to the situation. Only work o Default template: ```sql -CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( +CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP @@ -64,7 +65,7 @@ DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES ( If a custom field is filled in the template, such as adding an `id` field ```sql -CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` +CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( id, ${rowtype_fields} @@ -283,6 +284,89 @@ sink { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "${database_name}_test" + table = "${table_name}_test" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "${schema_name}_test" + table = "${table_name}_test" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + ## Changelog ### next version diff --git a/docs/zh/concept/sink-options-placeholders.md b/docs/zh/concept/sink-options-placeholders.md new file mode 100644 index 00000000000..2553feb549f --- /dev/null +++ b/docs/zh/concept/sink-options-placeholders.md @@ -0,0 +1,110 @@ +# Sink 参数占位符 + +## 介绍 + +SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占位符获取上游表元数据。 + +当您需要动态获取上游表元数据(例如多表写入)时,此功能至关重要。 + +本文档将指导您如何使用这些占位符以及如何有效地利用它们。 + +## 支持的引擎 + +> SeaTunnel Zeta
+> Flink
+> Spark
+ +## 占位符变量 + +占位符主要通过以下表达式实现: + +- `${database_name}` + - 用于获取上游表中的数据库名称 + - 也可以通过表达式指定默认值:`${database_name:default_my_db}` +- `${schema_name}` + - 用于获取上游表中的 schema 名称 + - 也可以通过表达式指定默认值:`${schema_name:default_my_schema}` +- `${table_name}` + - 用于获取上游表中的 table 名称 + - 也可以通过表达式指定默认值:`${table_name:default_my_table}` +- `${schema_full_name}` + - 用于获取上游表中的 schema 全路径名称,包含 database/schema 名称 +- `${table_full_name}` + - 用于获取上游表中的 table 全路径名称,包含 database/schema/table 名称 +- `${primary_key}` + - 用于获取上游表中的主键字段名称列表 +- `${unique_key}` + - 用于获取上游表中的唯一键字段名称列表 +- `${field_names}` + - 用于获取上游表中的所有字段名称列表 + +## 配置 + +*先决条件*: +- 确认 Sink 连接器已经支持了 `TableSinkFactory` API + +### 配置示例 1 + +```hocon +env { + // ignore... +} +source { + MySQL-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${database_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +### 配置示例 2 + +```hocon +env { + // ignore... +} +source { + Oracle-CDC { + // ignore... + } +} + +transform { + // ignore... +} + +sink { + jdbc { + url = "jdbc:mysql://localhost:3306" + driver = "com.mysql.cj.jdbc.Driver" + user = "root" + password = "123456" + + database = "${schema_name}_test" + table = "${table_name}_test" + primary_keys = ["${primary_key}"] + } +} +``` + +占位符的替换将在连接器启动之前完成,确保 Sink 参数在使用前已准备就绪。 +若该占位符变量没有被替换,则可能是上游表元数据缺少该选项,例如: +- `mysql` source 连接器不包含 `${schema_name}` 元数据 +- `oracle` source 连接器不包含 `${databse_name}` 元数据 +- ... diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java index bea1455bcb3..02b72faffb2 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java @@ -25,8 +25,11 @@ public enum SaveModePlaceHolder { ROWTYPE_UNIQUE_KEY("rowtype_unique_key", "unique keys"), ROWTYPE_DUPLICATE_KEY("rowtype_duplicate_key", "duplicate keys"), ROWTYPE_FIELDS("rowtype_fields", "fields"), - TABLE_NAME("table_name", "table name"), - DATABASE("database", "database"); + TABLE("table", "table"), + DATABASE("database", "database"), + /** @deprecated instead by {@link #TABLE} todo remove this enum */ + @Deprecated + TABLE_NAME("table_name", "table name"); private String keyValue; private String display; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java index f3bc08b0e1f..0291c2760cc 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.sink; +/** @deprecated instead by {@link TablePlaceholder} todo remove this class */ +@Deprecated public final class SinkReplaceNameConstant { public static final String REPLACE_TABLE_NAME_KEY = "${table_name}"; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java new file mode 100644 index 00000000000..f599e221350 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; + +import org.apache.commons.lang3.ObjectUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class TablePlaceholder { + // Placeholder ${database_name} or ${database_name:default_value} + public static final String REPLACE_DATABASE_NAME_KEY = "database_name"; + // Placeholder ${schema_name} or ${schema_name:default_value} + public static final String REPLACE_SCHEMA_NAME_KEY = "schema_name"; + // Placeholder ${schema_full_name} or ${schema_full_name:default_value} + public static final String REPLACE_SCHEMA_FULL_NAME_KEY = "schema_full_name"; + // Placeholder ${table_name} or ${table_name:default_value} + public static final String REPLACE_TABLE_NAME_KEY = "table_name"; + // Placeholder ${table_full_name} or ${table_full_name:default_value} + public static final String REPLACE_TABLE_FULL_NAME_KEY = "table_full_name"; + // Placeholder ${primary_key} or ${primary_key:default_value} + public static final String REPLACE_PRIMARY_KEY = "primary_key"; + // Placeholder ${unique_key} or ${unique_key:default_value} + public static final String REPLACE_UNIQUE_KEY = "unique_key"; + // Placeholder ${field_names} or ${field_names:default_value} + public static final String REPLACE_FIELD_NAMES_KEY = "field_names"; + public static final String NAME_DELIMITER = "."; + public static final String FIELD_DELIMITER = ","; + + private static String replacePlaceholders(String input, String placeholderName, String value) { + return replacePlaceholders(input, placeholderName, value, null); + } + + private static String replacePlaceholders( + String input, String placeholderName, String value, String defaultValue) { + String placeholderRegex = "\\$\\{" + Pattern.quote(placeholderName) + "(:[^}]*)?\\}"; + Pattern pattern = Pattern.compile(placeholderRegex); + Matcher matcher = pattern.matcher(input); + + StringBuffer result = new StringBuffer(); + while (matcher.find()) { + String replacement = + value != null && !value.isEmpty() + ? value + : (matcher.group(1) != null + ? matcher.group(1).substring(1).trim() + : defaultValue); + if (replacement == null) { + continue; + } + matcher.appendReplacement(result, Matcher.quoteReplacement(replacement)); + } + matcher.appendTail(result); + return result.toString(); + } + + private static String replaceTableIdentifier( + String placeholder, TableIdentifier identifier, String defaultValue) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_DATABASE_NAME_KEY, + identifier.getDatabaseName(), + defaultValue); + placeholder = + replacePlaceholders( + placeholder, + REPLACE_SCHEMA_NAME_KEY, + identifier.getSchemaName(), + defaultValue); + placeholder = + replacePlaceholders( + placeholder, + REPLACE_TABLE_NAME_KEY, + identifier.getTableName(), + defaultValue); + + List fullPath = new ArrayList<>(); + if (identifier.getDatabaseName() != null) { + fullPath.add(identifier.getDatabaseName()); + } + if (identifier.getSchemaName() != null) { + fullPath.add(identifier.getSchemaName()); + } + if (!fullPath.isEmpty()) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_SCHEMA_FULL_NAME_KEY, + String.join(NAME_DELIMITER, fullPath), + defaultValue); + } + + if (identifier.getTableName() != null) { + fullPath.add(identifier.getTableName()); + } + if (!fullPath.isEmpty()) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_TABLE_FULL_NAME_KEY, + String.join(NAME_DELIMITER, fullPath), + defaultValue); + } + return placeholder; + } + + public static String replaceTableIdentifier(String placeholder, TableIdentifier identifier) { + return replaceTableIdentifier(placeholder, identifier, ""); + } + + public static String replaceTablePrimaryKey(String placeholder, PrimaryKey primaryKey) { + if (primaryKey != null && !primaryKey.getColumnNames().isEmpty()) { + String pkFieldsString = String.join(FIELD_DELIMITER, primaryKey.getColumnNames()); + return replacePlaceholders(placeholder, REPLACE_PRIMARY_KEY, pkFieldsString); + } + return placeholder; + } + + public static String replaceTableUniqueKey( + String placeholder, List constraintKeys) { + Optional ukFieldsString = + constraintKeys.stream() + .filter( + e -> + e.getConstraintType() + .equals(ConstraintKey.ConstraintType.UNIQUE_KEY)) + .findFirst() + .map( + e -> + e.getColumnNames().stream() + .map(f -> f.getColumnName()) + .collect(Collectors.joining(FIELD_DELIMITER))); + if (ukFieldsString.isPresent()) { + return replacePlaceholders(placeholder, REPLACE_UNIQUE_KEY, ukFieldsString.get()); + } + return placeholder; + } + + public static String replaceTableFieldNames(String placeholder, TableSchema schema) { + return replacePlaceholders( + placeholder, + REPLACE_FIELD_NAMES_KEY, + String.join(FIELD_DELIMITER, schema.getFieldNames())); + } + + public static ReadonlyConfig replaceTablePlaceholder( + ReadonlyConfig config, CatalogTable table) { + return replaceTablePlaceholder(config, table, Collections.emptyList()); + } + + public static ReadonlyConfig replaceTablePlaceholder( + ReadonlyConfig config, CatalogTable table, Collection excludeKeys) { + Map copyOnWriteData = ObjectUtils.clone(config.getSourceMap()); + for (String key : copyOnWriteData.keySet()) { + if (excludeKeys.contains(key)) { + continue; + } + Object value = copyOnWriteData.get(key); + if (value != null) { + if (value instanceof String) { + String strValue = (String) value; + strValue = replaceTableIdentifier(strValue, table.getTableId()); + strValue = + replaceTablePrimaryKey( + strValue, table.getTableSchema().getPrimaryKey()); + strValue = + replaceTableUniqueKey( + strValue, table.getTableSchema().getConstraintKeys()); + strValue = replaceTableFieldNames(strValue, table.getTableSchema()); + copyOnWriteData.put(key, strValue); + } else if (value instanceof List) { + List listValue = (List) value; + if (listValue.size() == 1 && listValue.get(0) instanceof String) { + String strValue = (String) listValue.get(0); + if (strValue.equals("${" + REPLACE_PRIMARY_KEY + "}")) { + strValue = + replaceTablePrimaryKey( + strValue, table.getTableSchema().getPrimaryKey()); + listValue = Arrays.asList(strValue.split(FIELD_DELIMITER)); + } else if (strValue.equals("${" + REPLACE_UNIQUE_KEY + "}")) { + strValue = + replaceTableUniqueKey( + strValue, table.getTableSchema().getConstraintKeys()); + listValue = Arrays.asList(strValue.split(FIELD_DELIMITER)); + } else if (strValue.equals("${" + REPLACE_FIELD_NAMES_KEY + "}")) { + strValue = replaceTableFieldNames(strValue, table.getTableSchema()); + listValue = Arrays.asList(strValue.split(FIELD_DELIMITER)); + } + copyOnWriteData.put(key, listValue); + } + } + } + } + return ReadonlyConfig.fromMap(copyOnWriteData); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 04e74413cf1..668ff2a43c8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -115,15 +115,26 @@ SeaTunnelSource createAndPrepareSource( public static SeaTunnelSink createAndPrepareSink( CatalogTable catalogTable, - ReadonlyConfig options, + ReadonlyConfig config, ClassLoader classLoader, String factoryIdentifier) { try { TableSinkFactory factory = discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier); TableSinkFactoryContext context = - new TableSinkFactoryContext(catalogTable, options, classLoader); + TableSinkFactoryContext.replacePlaceholderAndCreate( + catalogTable, + config, + classLoader, + factory.excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); + + LOG.info( + "Create sink '{}' with upstream input catalog-table[database: {}, schema: {}, table: {}]", + factoryIdentifier, + catalogTable.getTablePath().getDatabaseName(), + catalogTable.getTablePath().getSchemaName(), + catalogTable.getTablePath().getTableName()); return factory.createSink(context).createSink(); } catch (Throwable t) { throw new FactoryException( diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index 97fba1f256a..5ba125854b3 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -19,6 +19,9 @@ import org.apache.seatunnel.api.table.connector.TableSink; +import java.util.Collections; +import java.util.List; + /** * This is an SPI interface, used to create {@link TableSink}. Each plugin need to have it own * implementation. @@ -41,4 +44,9 @@ default TableSink createSink( throw new UnsupportedOperationException( "The Factory has not been implemented and the deprecated Plugin will be used."); } + + @Deprecated + default List excludeTablePlaceholderReplaceKeys() { + return Collections.emptyList(); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java index f579adc4165..9565bad6a03 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java @@ -18,18 +18,32 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.TablePlaceholder; import org.apache.seatunnel.api.table.catalog.CatalogTable; import lombok.Getter; +import java.util.Collection; + @Getter public class TableSinkFactoryContext extends TableFactoryContext { private final CatalogTable catalogTable; - public TableSinkFactoryContext( + protected TableSinkFactoryContext( CatalogTable catalogTable, ReadonlyConfig options, ClassLoader classLoader) { super(options, classLoader); this.catalogTable = catalogTable; } + + public static TableSinkFactoryContext replacePlaceholderAndCreate( + CatalogTable catalogTable, + ReadonlyConfig options, + ClassLoader classLoader, + Collection excludeTablePlaceholderReplaceKeys) { + ReadonlyConfig rewriteConfig = + TablePlaceholder.replaceTablePlaceholder( + options, catalogTable, excludeTablePlaceholderReplaceKeys); + return new TableSinkFactoryContext(catalogTable, rewriteConfig, classLoader); + } } diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java new file mode 100644 index 00000000000..1a87a53f97f --- /dev/null +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/TablePlaceholderTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TablePlaceholderTest { + private static final Option DATABASE = + Options.key("database").stringType().noDefaultValue(); + private static final Option SCHEMA = + Options.key("schema").stringType().noDefaultValue(); + private static final Option TABLE = Options.key("table").stringType().noDefaultValue(); + private static final Option PRIMARY_KEY = + Options.key("primary_key").stringType().noDefaultValue(); + private static final Option> PRIMARY_KEY_ARRAY = + Options.key("primary_key_array").listType(String.class).noDefaultValue(); + private static final Option UNIQUE_KEY = + Options.key("unique_key").stringType().noDefaultValue(); + private static final Option> UNIQUE_KEY_ARRAY = + Options.key("unique_key_array").listType(String.class).noDefaultValue(); + private static final Option FIELD_NAMES = + Options.key("field_names").stringType().noDefaultValue(); + private static final Option> FIELD_NAMES_ARRAY = + Options.key("field_names_array").listType(String.class).noDefaultValue(); + + @Test + public void testSinkOptions() { + ReadonlyConfig config = createConfig(); + CatalogTable table = createTestTable(); + ReadonlyConfig newConfig = TablePlaceholder.replaceTablePlaceholder(config, table); + + Assertions.assertEquals("xyz_my-database_test", newConfig.get(DATABASE)); + Assertions.assertEquals("xyz_my-schema_test", newConfig.get(SCHEMA)); + Assertions.assertEquals("xyz_my-table_test", newConfig.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY)); + } + + @Test + public void testSinkOptionsWithNoTablePath() { + ReadonlyConfig config = createConfig(); + CatalogTable table = createTestTableWithNoTablePath(); + ReadonlyConfig newConfig = TablePlaceholder.replaceTablePlaceholder(config, table); + + Assertions.assertEquals("xyz_default_db_test", newConfig.get(DATABASE)); + Assertions.assertEquals("xyz_default_schema_test", newConfig.get(SCHEMA)); + Assertions.assertEquals("xyz_default_table_test", newConfig.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY)); + } + + @Test + public void testSinkOptionsWithExcludeKeys() { + ReadonlyConfig config = createConfig(); + CatalogTable table = createTestTableWithNoTablePath(); + ReadonlyConfig newConfig = + TablePlaceholder.replaceTablePlaceholder( + config, table, Arrays.asList(DATABASE.key())); + + Assertions.assertEquals("xyz_${database_name: default_db}_test", newConfig.get(DATABASE)); + Assertions.assertEquals("xyz_default_schema_test", newConfig.get(SCHEMA)); + Assertions.assertEquals("xyz_default_table_test", newConfig.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig.get(FIELD_NAMES_ARRAY)); + } + + @Test + public void testSinkOptionsWithMultiTable() { + ReadonlyConfig config = createConfig(); + CatalogTable table1 = createTestTable(); + CatalogTable table2 = createTestTableWithNoTablePath(); + ReadonlyConfig newConfig1 = + TablePlaceholder.replaceTablePlaceholder(config, table1, Arrays.asList()); + ReadonlyConfig newConfig2 = + TablePlaceholder.replaceTablePlaceholder(config, table2, Arrays.asList()); + + Assertions.assertEquals("xyz_my-database_test", newConfig1.get(DATABASE)); + Assertions.assertEquals("xyz_my-schema_test", newConfig1.get(SCHEMA)); + Assertions.assertEquals("xyz_my-table_test", newConfig1.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig1.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig1.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig1.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig1.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig1.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig1.get(FIELD_NAMES_ARRAY)); + + Assertions.assertEquals("xyz_default_db_test", newConfig2.get(DATABASE)); + Assertions.assertEquals("xyz_default_schema_test", newConfig2.get(SCHEMA)); + Assertions.assertEquals("xyz_default_table_test", newConfig2.get(TABLE)); + Assertions.assertEquals("f1,f2", newConfig2.get(PRIMARY_KEY)); + Assertions.assertEquals("f3,f4", newConfig2.get(UNIQUE_KEY)); + Assertions.assertEquals("f1,f2,f3,f4,f5", newConfig2.get(FIELD_NAMES)); + Assertions.assertEquals(Arrays.asList("f1", "f2"), newConfig2.get(PRIMARY_KEY_ARRAY)); + Assertions.assertEquals(Arrays.asList("f3", "f4"), newConfig2.get(UNIQUE_KEY_ARRAY)); + Assertions.assertEquals( + Arrays.asList("f1", "f2", "f3", "f4", "f5"), newConfig2.get(FIELD_NAMES_ARRAY)); + } + + private static ReadonlyConfig createConfig() { + Map configMap = new HashMap<>(); + configMap.put(DATABASE.key(), "xyz_${database_name: default_db}_test"); + configMap.put(SCHEMA.key(), "xyz_${schema_name: default_schema}_test"); + configMap.put(TABLE.key(), "xyz_${table_name: default_table}_test"); + configMap.put(PRIMARY_KEY.key(), "${primary_key}"); + configMap.put(UNIQUE_KEY.key(), "${unique_key}"); + configMap.put(FIELD_NAMES.key(), "${field_names}"); + configMap.put(PRIMARY_KEY_ARRAY.key(), Arrays.asList("${primary_key}")); + configMap.put(UNIQUE_KEY_ARRAY.key(), Arrays.asList("${unique_key}")); + configMap.put(FIELD_NAMES_ARRAY.key(), Arrays.asList("${field_names}")); + return ReadonlyConfig.fromMap(configMap); + } + + private static CatalogTable createTestTableWithNoTablePath() { + TableIdentifier tableId = TableIdentifier.of("my-catalog", null, null, null); + TableSchema tableSchema = + TableSchema.builder() + .primaryKey(PrimaryKey.of("my-pk", Arrays.asList("f1", "f2"))) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.UNIQUE_KEY, + "my-uk", + Arrays.asList( + ConstraintKey.ConstraintKeyColumn.of( + "f3", ConstraintKey.ColumnSortType.ASC), + ConstraintKey.ConstraintKeyColumn.of( + "f4", ConstraintKey.ColumnSortType.ASC)))) + .column( + PhysicalColumn.builder() + .name("f1") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f2") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f3") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f4") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f5") + .dataType(BasicType.STRING_TYPE) + .build()) + .build(); + return CatalogTable.of( + tableId, tableSchema, Collections.emptyMap(), Collections.emptyList(), null); + } + + private static CatalogTable createTestTable() { + TableIdentifier tableId = + TableIdentifier.of("my-catalog", "my-database", "my-schema", "my-table"); + TableSchema tableSchema = + TableSchema.builder() + .primaryKey(PrimaryKey.of("my-pk", Arrays.asList("f1", "f2"))) + .constraintKey( + ConstraintKey.of( + ConstraintKey.ConstraintType.UNIQUE_KEY, + "my-uk", + Arrays.asList( + ConstraintKey.ConstraintKeyColumn.of( + "f3", ConstraintKey.ColumnSortType.ASC), + ConstraintKey.ConstraintKeyColumn.of( + "f4", ConstraintKey.ColumnSortType.ASC)))) + .column( + PhysicalColumn.builder() + .name("f1") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f2") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f3") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f4") + .dataType(BasicType.STRING_TYPE) + .build()) + .column( + PhysicalColumn.builder() + .name("f5") + .dataType(BasicType.STRING_TYPE) + .build()) + .build(); + return CatalogTable.of( + tableId, tableSchema, Collections.emptyMap(), Collections.emptyList(), null); + } +} diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java index 6a93f83abc5..931555857de 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java @@ -33,7 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser; import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertTableRule; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.commons.collections4.CollectionUtils; @@ -97,7 +96,7 @@ public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { + public AssertSinkWriter createWriter(SinkWriter.Context context) { return new AssertSinkWriter( seaTunnelRowType, assertFieldRules, assertRowRules, assertTableRule); } diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java index d26c8196952..62ebab6a9ff 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA; import static org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY; @@ -41,7 +40,7 @@ public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig options) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { + public ConsoleSinkWriter createWriter(SinkWriter.Context context) { return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData, delayMs); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java index 292c8eba53e..ddf1195b6ed 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java @@ -231,7 +231,7 @@ public interface DorisOptions { "CREATE TABLE IF NOT EXISTS `" + SaveModePlaceHolder.DATABASE.getPlaceHolder() + "`.`" - + SaveModePlaceHolder.TABLE_NAME.getPlaceHolder() + + SaveModePlaceHolder.TABLE.getPlaceHolder() + "` (\n" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ",\n" diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index e14b64d9a28..c449dda027b 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -77,8 +77,7 @@ public void setJobContext(JobContext jobContext) { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public DorisSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new DorisSinkWriter( context, Collections.emptyList(), catalogTable, dorisConfig, jobId); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index 34ba05d7d4d..e1849c39341 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -35,9 +35,9 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; +import java.util.Arrays; +import java.util.List; + import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; @@ -58,6 +58,11 @@ public OptionRule optionRule() { return DorisOptions.SINK_RULE.build(); } + @Override + public List excludeTablePlaceholderReplaceKeys() { + return Arrays.asList(DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + } + @Override public TableSink createSink( TableSinkFactoryContext context) { @@ -81,12 +86,12 @@ private CatalogTable renameCatalogTable(ReadonlyConfig options, CatalogTable cat databaseName = tableIdentifier.split("\\.")[0]; } else { if (StringUtils.isNotEmpty(options.get(TABLE))) { - tableName = replaceName(options.get(TABLE), tableId); + tableName = options.get(TABLE); } else { tableName = tableId.getTableName(); } if (StringUtils.isNotEmpty(options.get(DATABASE))) { - databaseName = replaceName(options.get(DATABASE), tableId); + databaseName = options.get(DATABASE); } else { databaseName = tableId.getDatabaseName(); } @@ -95,17 +100,4 @@ private CatalogTable renameCatalogTable(ReadonlyConfig options, CatalogTable cat TableIdentifier.of(tableId.getCatalogName(), databaseName, null, tableName); return CatalogTable.of(newTableId, catalogTable); } - - private String replaceName(String original, TableIdentifier tableId) { - if (tableId.getTableName() != null) { - original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); - } - if (tableId.getSchemaName() != null) { - original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); - } - if (tableId.getDatabaseName() != null) { - original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); - } - return original; - } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java index 8cbac437187..5025caed21c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java @@ -29,6 +29,8 @@ import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -39,6 +41,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +@Slf4j public class DorisCatalogUtil { public static final String ALL_DATABASES_QUERY = @@ -184,12 +187,22 @@ public static String getCreateTableStatement( .filter(column -> !columnInTemplate.containsKey(column.getName())) .map(x -> DorisCatalogUtil.columnToDorisType(x, typeConverter)) .collect(Collectors.joining(",\n")); + + if (template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) { + // TODO: Remove this compatibility config + template = + template.replaceAll( + SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), + tablePath.getTableName()); + log.warn( + "The variable placeholder `${table_name}` has been marked as deprecated and will be removed soon, please use `${table}`"); + } + return template.replaceAll( SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), tablePath.getDatabaseName()) .replaceAll( - SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), - tablePath.getTableName()) + SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName()) .replaceAll( SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); } diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java index 5a74bcbf59e..09a5b6a3293 100644 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java @@ -65,7 +65,7 @@ public void test() { String result = DorisCatalogUtil.getCreateTableStatement( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n" + "${rowtype_primary_key} , \n" + "${rowtype_unique_key} , \n" + "`create_time` DATETIME NOT NULL , \n" @@ -237,7 +237,7 @@ public void testInSeq() { String result = DorisCatalogUtil.getCreateTableStatement( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n" + "`L_COMMITDATE`,\n" + "${rowtype_primary_key},\n" + "L_SUPPKEY BIGINT NOT NULL,\n" @@ -301,7 +301,7 @@ public void testWithVarchar() { String result = DorisCatalogUtil.getCreateTableStatement( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n" + "${rowtype_primary_key} , \n" + "`create_time` DATETIME NOT NULL , \n" + "${rowtype_fields} \n" @@ -363,7 +363,7 @@ public void testWithThreePrimaryKeys() { String result = DorisCatalogUtil.getCreateTableStatement( - "create table '${database}'.'${table_name}'(\n" + "create table '${database}'.'${table}'(\n" + " ${rowtype_fields}\n" + " )\n" + " partitioned by ${rowtype_primary_key};", diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java index 99758c76f38..ad515aeeb7c 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import java.io.IOException; @@ -52,8 +51,7 @@ public DruidSink(ReadonlyConfig config, CatalogTable table) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public DruidWriter createWriter(SinkWriter.Context context) throws IOException { return new DruidWriter( seaTunnelRowType, config.get(COORDINATOR_URL), diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java index 0f78ba0a582..0c6824b521e 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java @@ -20,9 +20,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -30,9 +28,6 @@ import com.google.auto.service.AutoService; -import java.util.HashMap; -import java.util.Map; - import static org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL; import static org.apache.seatunnel.connectors.druid.config.DruidConfig.DATASOURCE; @@ -52,46 +47,6 @@ public OptionRule optionRule() { public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - return () -> new DruidSink(finalReadonlyConfig, catalogTable); - } - - private ReadonlyConfig generateCurrentReadonlyConfig( - ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - - Map configMap = readonlyConfig.toMap(); - - readonlyConfig - .getOptional(DATASOURCE) - .ifPresent( - tableName -> { - String replacedPath = - replaceCatalogTableInPath(tableName, catalogTable); - configMap.put(DATASOURCE.key(), replacedPath); - }); - - return ReadonlyConfig.fromMap(new HashMap<>(configMap)); - } - - private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { - String tableName = originTableName; - TableIdentifier tableIdentifier = catalogTable.getTableId(); - if (tableIdentifier != null) { - if (tableIdentifier.getSchemaName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, - tableIdentifier.getSchemaName()); - } - if (tableIdentifier.getTableName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, - tableIdentifier.getTableName()); - } - } - return tableName; + return () -> new DruidSink(readonlyConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index 6325a14e997..dee47bfd73e 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -71,8 +71,7 @@ public String getPluginName() { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) { + public ElasticsearchSinkWriter createWriter(SinkWriter.Context context) { return new ElasticsearchSinkWriter( context, catalogTable, config, maxBatchSize, maxRetryCount); } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java index 63770dd1d7f..56ec1d0ab7b 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -30,10 +29,6 @@ import com.google.auto.service.AutoService; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD; @@ -81,13 +76,7 @@ public OptionRule optionRule() { @Override public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); - CatalogTable catalogTable = context.getCatalogTable(); - - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - - String original = finalReadonlyConfig.get(INDEX); - + String original = readonlyConfig.get(INDEX); CatalogTable newTable = CatalogTable.of( TableIdentifier.of( @@ -95,41 +84,6 @@ public TableSink createSink(TableSinkFactoryContext context) { context.getCatalogTable().getTablePath().getDatabaseName(), original), context.getCatalogTable()); - return () -> new ElasticsearchSink(finalReadonlyConfig, newTable); - } - - private ReadonlyConfig generateCurrentReadonlyConfig( - ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - - Map configMap = readonlyConfig.toMap(); - - readonlyConfig - .getOptional(INDEX) - .ifPresent( - tableName -> { - String replacedPath = - replaceCatalogTableInPath(tableName, catalogTable); - configMap.put(INDEX.key(), replacedPath); - }); - - return ReadonlyConfig.fromMap(new HashMap<>(configMap)); - } - - private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { - String tableName = originTableName; - TableIdentifier tableIdentifier = catalogTable.getTableId(); - if (tableIdentifier != null) { - if (tableIdentifier.getSchemaName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, - tableIdentifier.getSchemaName()); - } - if (tableIdentifier.getTableName() != null) { - tableName = - tableName.replace(REPLACE_TABLE_NAME_KEY, tableIdentifier.getTableName()); - } - } - return tableName; + return () -> new ElasticsearchSink(readonlyConfig, newTable); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/factory/BaseMultipleTableFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/factory/BaseMultipleTableFileSinkFactory.java index 9f9f5f382f6..508b25c190d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/factory/BaseMultipleTableFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/factory/BaseMultipleTableFileSinkFactory.java @@ -17,76 +17,12 @@ package org.apache.seatunnel.connectors.seatunnel.file.factory; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; - -import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; public abstract class BaseMultipleTableFileSinkFactory implements TableSinkFactory< - SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> { - - // replace the table name in sink config's path - public ReadonlyConfig generateCurrentReadonlyConfig( - ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - // Copy the config to avoid modifying the original config - Config config = readonlyConfig.toConfig(); - - if (config.hasPath(BaseSinkConfig.FILE_PATH.key())) { - String replacedPath = - replaceCatalogTableInPath( - config.getString(BaseSinkConfig.FILE_PATH.key()), catalogTable); - config = - config.withValue( - BaseSinkConfig.FILE_PATH.key(), - ConfigValueFactory.fromAnyRef(replacedPath)); - } - - if (config.hasPath(BaseSinkConfig.TMP_PATH.key())) { - String replacedPath = - replaceCatalogTableInPath( - config.getString(BaseSinkConfig.TMP_PATH.key()), catalogTable); - config = - config.withValue( - BaseSinkConfig.TMP_PATH.key(), - ConfigValueFactory.fromAnyRef(replacedPath)); - } - - return ReadonlyConfig.fromConfig(config); - } - - public String replaceCatalogTableInPath(String originString, CatalogTable catalogTable) { - String path = originString; - TableIdentifier tableIdentifier = catalogTable.getTableId(); - if (tableIdentifier != null) { - if (tableIdentifier.getDatabaseName() != null) { - path = - path.replace( - SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY, - tableIdentifier.getDatabaseName()); - } - if (tableIdentifier.getSchemaName() != null) { - path = - path.replace( - SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, - tableIdentifier.getSchemaName()); - } - if (tableIdentifier.getTableName() != null) { - path = - path.replace( - SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, - tableIdentifier.getTableName()); - } - } - return path; - } -} + SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java index 1ae4b840295..a48368be448 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java @@ -90,8 +90,7 @@ public SinkWriter restoreWriter( } @Override - public SinkWriter createWriter( - SinkWriter.Context context) { + public BaseFileSinkWriter createWriter(SinkWriter.Context context) { return new BaseFileSinkWriter(createWriteStrategy(), hadoopConf, context, jobId); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index fc699b42962..e8ee8e436d1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -104,9 +104,6 @@ public OptionRule optionRule() { createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - return () -> new LocalFileSink(finalReadonlyConfig, catalogTable); + return () -> new LocalFileSink(readonlyConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index edf6610714c..5d6cb649f20 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -42,10 +42,7 @@ public String factoryIdentifier() { public TableSink createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - return () -> new OssFileSink(finalReadonlyConfig, catalogTable); + return () -> new OssFileSink(readonlyConfig, catalogTable); } @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index 81ac7674043..4ac9f45915e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -17,13 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.s3.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; - import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -35,10 +31,6 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; - @AutoService(Factory.class) public class S3FileSinkFactory implements TableSinkFactory { @Override @@ -110,31 +102,14 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(BaseSinkConfig.TMP_PATH) .build(); } @Override public TableSink createSink(TableSinkFactoryContext context) { final CatalogTable catalogTable = context.getCatalogTable(); - final ReadonlyConfig options = context.getOptions(); - // get source table relevant information - TableIdentifier tableId = catalogTable.getTableId(); - String sourceDatabaseName = - tableId.getDatabaseName() == null ? "" : tableId.getDatabaseName(); - String sourceSchemaName = tableId.getSchemaName() == null ? "" : tableId.getSchemaName(); - String sourceTableName = tableId.getTableName() == null ? "" : tableId.getTableName(); - // get sink path - String path = options.get(S3ConfigOptions.FILE_PATH); - // to replace - path = path.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName); - path = path.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName); - path = path.replace(REPLACE_TABLE_NAME_KEY, sourceTableName); - // rebuild - Config config = options.toConfig(); - config = - config.withValue( - S3ConfigOptions.FILE_PATH.key(), ConfigValueFactory.fromAnyRef(path)); - ReadonlyConfig finalConfig = ReadonlyConfig.fromConfig(config); + final ReadonlyConfig finalConfig = context.getOptions(); return () -> new S3FileSink(catalogTable, finalConfig); } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index b91c65de9bd..b5602c13f88 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -179,14 +179,12 @@ public void setJobContext(JobContext jobContext) { } @Override - public SinkWriter restoreWriter( - SinkWriter.Context context, List states) { + public HiveSinkWriter restoreWriter(SinkWriter.Context context, List states) { return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, jobId, states); } @Override - public SinkWriter createWriter( - SinkWriter.Context context) { + public HiveSinkWriter createWriter(SinkWriter.Context context) { return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, jobId); } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java index e53aed86fc6..313ee38b836 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.sink.SinkReplaceNameConstant; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -36,9 +34,6 @@ import com.google.auto.service.AutoService; -import java.util.HashMap; -import java.util.Map; - @AutoService(Factory.class) public class HiveSinkFactory implements TableSinkFactory< @@ -63,57 +58,11 @@ public OptionRule optionRule() { createSink(TableSinkFactoryContext context) { ReadonlyConfig readonlyConfig = context.getOptions(); CatalogTable catalogTable = context.getCatalogTable(); - - ReadonlyConfig finalReadonlyConfig = - generateCurrentReadonlyConfig(readonlyConfig, catalogTable); - return () -> new HiveSink(finalReadonlyConfig, catalogTable); + return () -> new HiveSink(readonlyConfig, catalogTable); } @Override public String factoryIdentifier() { return HiveConstants.CONNECTOR_NAME; } - - private ReadonlyConfig generateCurrentReadonlyConfig( - ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { - - Map configMap = readonlyConfig.toMap(); - - readonlyConfig - .getOptional(HiveSinkOptions.TABLE_NAME) - .ifPresent( - tableName -> { - String replacedPath = - replaceCatalogTableInPath(tableName, catalogTable); - configMap.put(HiveSinkOptions.TABLE_NAME.key(), replacedPath); - }); - - return ReadonlyConfig.fromMap(new HashMap<>(configMap)); - } - - private String replaceCatalogTableInPath(String originTableName, CatalogTable catalogTable) { - String tableName = originTableName; - TableIdentifier tableIdentifier = catalogTable.getTableId(); - if (tableIdentifier != null) { - if (tableIdentifier.getDatabaseName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY, - tableIdentifier.getDatabaseName()); - } - if (tableIdentifier.getSchemaName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY, - tableIdentifier.getSchemaName()); - } - if (tableIdentifier.getTableName() != null) { - tableName = - tableName.replace( - SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY, - tableIdentifier.getTableName()); - } - } - return tableName; - } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java index da1cb0a8dad..9dfe688c118 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java @@ -28,7 +28,6 @@ import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; @@ -81,8 +80,7 @@ public String getPluginName() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public HttpSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new HttpSinkWriter(seaTunnelRowType, httpParameter); } } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java index ca6459bee15..f438167c39d 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java @@ -20,9 +20,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink; import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter; import org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig; @@ -39,7 +37,7 @@ public String getPluginName() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) { + public HttpSinkWriter createWriter(SinkWriter.Context context) { return new HttpSinkWriter( seaTunnelRowType, super.httpParameter, diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java index 9e6ddfee862..4065338bbff 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java @@ -60,8 +60,8 @@ public String getPluginName() { } @Override - public SinkWriter restoreWriter( - SinkWriter.Context context, List states) throws IOException { + public HudiSinkWriter restoreWriter(SinkWriter.Context context, List states) + throws IOException { return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, states); } @@ -87,8 +87,7 @@ public Optional> getAggregatedCommitInfoSer } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public HudiSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, new ArrayList<>()); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java index ad92aa1d75f..008ab799b9d 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java @@ -88,8 +88,7 @@ public String getPluginName() { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public IcebergSinkWriter createWriter(SinkWriter.Context context) throws IOException { return IcebergSinkWriter.of(config, catalogTable); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java index 3441420226c..b32430b3197 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java @@ -35,12 +35,6 @@ @AutoService(Factory.class) public class IcebergSinkFactory implements TableSinkFactory { - public static final String REPLACE_TABLE_NAME_KEY = "${table_name}"; - - public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}"; - - public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}"; - @Override public String factoryIdentifier() { return "Iceberg"; @@ -80,13 +74,13 @@ private CatalogTable renameCatalogTable(SinkConfig sinkConfig, CatalogTable cata String tableName; String namespace; if (StringUtils.isNotEmpty(sinkConfig.getTable())) { - tableName = replaceName(sinkConfig.getTable(), tableId); + tableName = sinkConfig.getTable(); } else { tableName = tableId.getTableName(); } if (StringUtils.isNotEmpty(sinkConfig.getNamespace())) { - namespace = replaceName(sinkConfig.getNamespace(), tableId); + namespace = sinkConfig.getNamespace(); } else { namespace = tableId.getSchemaName(); } @@ -97,17 +91,4 @@ private CatalogTable renameCatalogTable(SinkConfig sinkConfig, CatalogTable cata return CatalogTable.of(newTableId, catalogTable); } - - private String replaceName(String original, TableIdentifier tableId) { - if (tableId.getTableName() != null) { - original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); - } - if (tableId.getSchemaName() != null) { - original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); - } - if (tableId.getDatabaseName() != null) { - original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); - } - return original; - } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java index da7ba20f91d..4d940f63cc3 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -23,7 +23,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import java.io.IOException; @@ -45,8 +44,7 @@ public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public InfluxDBSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java index af651beb7c2..ca7c457b7db 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; @@ -49,8 +50,9 @@ import java.util.List; @Slf4j -public abstract class AbstractJdbcSinkWriter - implements SinkWriter { +public abstract class AbstractJdbcSinkWriter + implements SinkWriter, + SupportMultiTableSinkWriter { protected JdbcDialect dialect; protected TablePath sinkTablePath; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java index 31c89dc21bf..1fe8d915826 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.sink.SinkWriter; -import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -53,8 +52,7 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; -public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSinkWriter - implements SupportMultiTableSinkWriter { +public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class); private final SinkWriter.Context sinkcontext; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 946956a428a..a6a162f472c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -100,10 +100,9 @@ public String getPluginName() { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) { + public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) { TablePath sinkTablePath = catalogTable.getTablePath(); - SinkWriter sinkWriter; + AbstractJdbcSinkWriter sinkWriter; if (jdbcSinkConfig.isExactlyOnce()) { sinkWriter = new JdbcExactlyOnceSinkWriter( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index 4331b53d0a0..3f43b2088d0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.sink; import org.apache.seatunnel.api.sink.MultiTableResourceManager; -import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -42,8 +41,7 @@ import java.util.Optional; @Slf4j -public class JdbcSinkWriter extends AbstractJdbcSinkWriter - implements SupportMultiTableSinkWriter { +public class JdbcSinkWriter extends AbstractJdbcSinkWriter { private final Integer primaryKeyIndex; public JdbcSinkWriter( diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java index 898016b5cf8..def4a2b3668 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java @@ -54,8 +54,7 @@ public String getPluginName() { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public KuduSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new KuduSinkWriter(seaTunnelRowType, kuduSinkConfig); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index bc96fdcd78e..23651994ad3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -92,8 +92,7 @@ public String getPluginName() { } @Override - public SinkWriter createWriter( - SinkWriter.Context context) throws IOException { + public PaimonSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new PaimonSinkWriter( context, table, seaTunnelRowType, jobContext, paimonHadoopConfiguration); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index 46b92afb097..83976d84f94 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -36,12 +36,6 @@ @AutoService(Factory.class) public class PaimonSinkFactory implements TableSinkFactory { - public static final String REPLACE_TABLE_NAME_KEY = "${table_name}"; - - public static final String REPLACE_SCHEMA_NAME_KEY = "${schema_name}"; - - public static final String REPLACE_DATABASE_NAME_KEY = "${database_name}"; - @Override public String factoryIdentifier() { return "Paimon"; @@ -80,13 +74,13 @@ private CatalogTable renameCatalogTable( String tableName; String namespace; if (StringUtils.isNotEmpty(paimonSinkConfig.getTable())) { - tableName = replaceName(paimonSinkConfig.getTable(), tableId); + tableName = paimonSinkConfig.getTable(); } else { tableName = tableId.getTableName(); } if (StringUtils.isNotEmpty(paimonSinkConfig.getNamespace())) { - namespace = replaceName(paimonSinkConfig.getNamespace(), tableId); + namespace = paimonSinkConfig.getNamespace(); } else { namespace = tableId.getSchemaName(); } @@ -97,17 +91,4 @@ private CatalogTable renameCatalogTable( return CatalogTable.of(newTableId, catalogTable); } - - private String replaceName(String original, TableIdentifier tableId) { - if (tableId.getTableName() != null) { - original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); - } - if (tableId.getSchemaName() != null) { - original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); - } - if (tableId.getDatabaseName() != null) { - original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); - } - return original; - } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java index 7e6d23dbec8..a87ee1ebf75 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java @@ -24,7 +24,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; @@ -50,8 +49,7 @@ public String getPluginName() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { + public RedisSinkWriter createWriter(SinkWriter.Context context) throws IOException { return new RedisSinkWriter(seaTunnelRowType, redisParameters); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java index 937284cd668..bb34aaa5d14 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java @@ -61,7 +61,7 @@ public interface StarRocksSinkOptions { "CREATE TABLE IF NOT EXISTS `" + SaveModePlaceHolder.DATABASE.getPlaceHolder() + "`.`" - + SaveModePlaceHolder.TABLE_NAME.getPlaceHolder() + + SaveModePlaceHolder.TABLE.getPlaceHolder() + "` (\n" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ",\n" diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java index 0c2718d0b8a..7fd3af17e72 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java @@ -30,6 +30,8 @@ import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.util.Comparator; import java.util.List; import java.util.Map; @@ -38,6 +40,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +@Slf4j public class StarRocksSaveModeUtil { public static String getCreateTableSql( @@ -86,8 +89,18 @@ public static String getCreateTableSql( .filter(column -> !columnInTemplate.containsKey(column.getName())) .map(StarRocksSaveModeUtil::columnToStarrocksType) .collect(Collectors.joining(",\n")); + + if (template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) { + // TODO: Remove this compatibility config + template = + template.replaceAll( + SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table); + log.warn( + "The variable placeholder `${table_name}` has been marked as deprecated and will be removed soon, please use `${table}`"); + } + return template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), database) - .replaceAll(SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table) + .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), table) .replaceAll( SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index f05f912b6f6..51f7486569b 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -34,9 +34,9 @@ import com.google.auto.service.AutoService; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY; -import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; +import java.util.Arrays; +import java.util.List; + import static org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions.DATA_SAVE_MODE; @AutoService(Factory.class) @@ -73,56 +73,33 @@ public OptionRule optionRule() { .build(); } + @Override + public List excludeTablePlaceholderReplaceKeys() { + return Arrays.asList(StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + } + @Override public TableSink createSink(TableSinkFactoryContext context) { - SinkConfig sinkConfig = SinkConfig.of(context.getOptions()); CatalogTable catalogTable = context.getCatalogTable(); + SinkConfig sinkConfig = SinkConfig.of(context.getOptions()); if (StringUtils.isBlank(sinkConfig.getTable())) { sinkConfig.setTable(catalogTable.getTableId().getTableName()); } - // get source table relevant information - TableIdentifier tableId = catalogTable.getTableId(); - String sourceDatabaseName = tableId.getDatabaseName(); - String sourceSchemaName = tableId.getSchemaName(); - String sourceTableName = tableId.getTableName(); - // get sink table relevant information - String sinkDatabaseName = sinkConfig.getDatabase(); - String sinkTableName = sinkConfig.getTable(); - // to replace - sinkDatabaseName = - sinkDatabaseName.replace( - REPLACE_DATABASE_NAME_KEY, - sourceDatabaseName != null ? sourceDatabaseName : ""); - String finalTableName = this.replaceFullTableName(sinkTableName, tableId); - // rebuild TableIdentifier and catalogTable - TableIdentifier newTableId = + + TableIdentifier rewriteTableId = TableIdentifier.of( - tableId.getCatalogName(), sinkDatabaseName, null, finalTableName); - catalogTable = + catalogTable.getTableId().getCatalogName(), + sinkConfig.getDatabase(), + null, + sinkConfig.getTable()); + CatalogTable finalCatalogTable = CatalogTable.of( - newTableId, + rewriteTableId, catalogTable.getTableSchema(), catalogTable.getOptions(), catalogTable.getPartitionKeys(), catalogTable.getCatalogName()); - CatalogTable finalCatalogTable = catalogTable; - // reset - sinkConfig.setTable(finalTableName); - sinkConfig.setDatabase(sinkDatabaseName); return () -> new StarRocksSink(sinkConfig, finalCatalogTable, context.getOptions()); } - - private String replaceFullTableName(String original, TableIdentifier tableId) { - if (StringUtils.isNotBlank(tableId.getDatabaseName())) { - original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); - } - if (StringUtils.isNotBlank(tableId.getSchemaName())) { - original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); - } - if (StringUtils.isNotBlank(tableId.getTableName())) { - original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); - } - return original; - } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java index d7f759de2ac..fc3d15c4b4a 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java @@ -64,7 +64,7 @@ public void test() { String result = StarRocksSaveModeUtil.getCreateTableSql( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n" + "${rowtype_primary_key} , \n" + "${rowtype_unique_key} , \n" + "`create_time` DATETIME NOT NULL , \n" @@ -232,7 +232,7 @@ public void testInSeq() { String result = StarRocksSaveModeUtil.getCreateTableSql( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n" + "`L_COMMITDATE`,\n" + "${rowtype_primary_key},\n" + "L_SUPPKEY BIGINT NOT NULL,\n" @@ -289,7 +289,7 @@ public void testWithVarchar() { String result = StarRocksSaveModeUtil.getCreateTableSql( - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( \n" + "${rowtype_primary_key} , \n" + "`create_time` DATETIME NOT NULL , \n" + "${rowtype_fields} \n" @@ -346,7 +346,7 @@ public void testWithThreePrimaryKeys() { String result = StarRocksSaveModeUtil.getCreateTableSql( - "create table '${database}'.'${table_name}'(\n" + "create table '${database}'.'${table}'(\n" + " ${rowtype_fields}\n" + " )\n" + " partitioned by ${rowtype_primary_key};", diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index f775bfb46f3..6a272aadb21 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -111,10 +111,12 @@ public List execute(List upstreamDataS sink.setTypeInfo(sourceType); } else { TableSinkFactoryContext context = - new TableSinkFactoryContext( + TableSinkFactoryContext.replacePlaceholderAndCreate( stream.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 6257a94dde7..14247464551 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -112,10 +112,12 @@ public List execute(List upstreamDataS sink.setTypeInfo(sourceType); } else { TableSinkFactoryContext context = - new TableSinkFactoryContext( + TableSinkFactoryContext.replacePlaceholderAndCreate( stream.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index d080c21fa79..7751286b227 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -129,10 +129,12 @@ public List execute(List upstreamDataStreams sink.setTypeInfo((SeaTunnelRowType) inputType); } else { TableSinkFactoryContext context = - new TableSinkFactoryContext( + TableSinkFactoryContext.replacePlaceholderAndCreate( datasetTableInfo.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 08fe4162bcb..46b3233b00e 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -130,10 +130,12 @@ public List execute(List upstreamDataStreams sink.setTypeInfo((SeaTunnelRowType) inputType); } else { TableSinkFactoryContext context = - new TableSinkFactoryContext( + TableSinkFactoryContext.replacePlaceholderAndCreate( datasetTableInfo.getCatalogTable(), ReadonlyConfig.fromConfig(sinkConfig), - classLoader); + classLoader, + ((TableSinkFactory) factory.get()) + .excludeTablePlaceholderReplaceKeys()); ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule()); sink = ((TableSinkFactory) factory.get()).createSink(context).createSink(); sink.setJobContext(jobContext); diff --git a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java index 6d59ff27f56..62a037a6f65 100644 --- a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java +++ b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java @@ -18,6 +18,9 @@ package org.apache.seatunnel.api.connector; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -123,8 +126,8 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot sinkWithSPI.containsKey(factory.factoryIdentifier()), "Please remove `@AutoService(SeaTunnelSink.class)` annotation in " + sinkWithSPI.get(factory.factoryIdentifier())); - Class sinkClass = - (Class) + Class sinkClass = + (Class) Class.forName( factory.getClass() .getName() @@ -148,7 +151,31 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot "Please remove `getConsumedType` method in " + sinkClass.getSimpleName()); log.info( "Check sink connector {} successfully", factory.getClass().getSimpleName()); + + checkSupportMultiTableSink(sinkClass); } } } + + private void checkSupportMultiTableSink(Class sinkClass) { + if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) { + return; + } + + // Validate the `createWriter` method return type + Optional createWriter = + ReflectionUtils.getDeclaredMethod( + sinkClass, "createWriter", SinkWriter.Context.class); + Assertions.assertTrue( + createWriter.isPresent(), + "Please add `createWriter` method in " + sinkClass.getSimpleName()); + Class createWriterClass = + (Class) createWriter.get().getReturnType(); + Assertions.assertTrue( + SupportMultiTableSinkWriter.class.isAssignableFrom(createWriterClass), + String.format( + "Please update the `createWriter` method return type to the subclass of `SupportMultiTableSinkWriter`, " + + "because `%s` implements `SupportMultiTableSink` interface", + sinkClass.getSimpleName())); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf index 1d1c1c80c7e..427b98fc5c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_custom_primary_key.conf @@ -54,7 +54,10 @@ sink { driver = "com.mysql.cj.jdbc.Driver" user = "st_user_sink" password = "mysqlpw" + database = "mysql_cdc2" + table = "${table_name}" + primary_keys = ["${primary_key}"] generate_sink_sql = true } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf index c382c1c5867..f2b513e5ba7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf @@ -49,7 +49,10 @@ sink { driver = "com.mysql.cj.jdbc.Driver" user = "st_user_sink" password = "mysqlpw" + database = "mysql_cdc2" + table = "${table_name}" + primary_keys = ["${primary_key}"] generate_sink_sql = true } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf index cb10cf26447..6c93ceda100 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf @@ -49,7 +49,10 @@ sink { driver = "com.mysql.cj.jdbc.Driver" user = "st_user_sink" password = "mysqlpw" + database = "mysql_cdc2" + table = "${table_name}" + primary_keys = ["${primary_key}"] generate_sink_sql = true } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java index 290da4381ab..f8550a615af 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java @@ -239,8 +239,11 @@ private CatalogTable assertCreateTable( CatalogTable upstreamTable, ReadonlyConfig config, String fullName) { DorisSinkFactory dorisSinkFactory = new DorisSinkFactory(); TableSinkFactoryContext context = - new TableSinkFactoryContext( - upstreamTable, config, Thread.currentThread().getContextClassLoader()); + TableSinkFactoryContext.replacePlaceholderAndCreate( + upstreamTable, + config, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); SupportSaveMode sink = (SupportSaveMode) dorisSinkFactory.createSink(context).createSink(); sink.getSaveModeHandler().get().handleSaveMode(); CatalogTable createdTable = catalog.getTable(TablePath.of(fullName)); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_to_doris_sink_type_convertor.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_to_doris_sink_type_convertor.conf index a162df721de..fce6214d9f8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_to_doris_sink_type_convertor.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_to_doris_sink_type_convertor.conf @@ -46,6 +46,6 @@ sink{ format="json" read_json_by_line="true" } - save_mode_create_template = """CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (${rowtype_fields}) ENGINE=OLAP duplicate KEY (${rowtype_duplicate_key}) DISTRIBUTED BY HASH (${rowtype_duplicate_key}) PROPERTIES ("replication_allocation" = "tag.location.default: 1")""" + save_mode_create_template = """CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (${rowtype_fields}) ENGINE=OLAP duplicate KEY (${rowtype_duplicate_key}) DISTRIBUTED BY HASH (${rowtype_duplicate_key}) PROPERTIES ("replication_allocation" = "tag.location.default: 1")""" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf index 3919a236cf5..ded06046b99 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf @@ -67,6 +67,6 @@ sink{ format="json" read_json_by_line="true" } - save_mode_create_template = """CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (${rowtype_fields}) ENGINE=OLAP unique KEY (`F_ID`) DISTRIBUTED BY HASH (`F_ID`) PROPERTIES ("replication_allocation" = "tag.location.default: 1")""" + save_mode_create_template = """CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (${rowtype_fields}) ENGINE=OLAP unique KEY (`F_ID`) DISTRIBUTED BY HASH (`F_ID`) PROPERTIES ("replication_allocation" = "tag.location.default: 1")""" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index c98e9a1ff33..4c2ecc94e39 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -67,6 +67,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -479,8 +480,11 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound map1.put("url", getUrl()); ReadonlyConfig config1 = ReadonlyConfig.fromMap(map1); TableSinkFactoryContext context1 = - new TableSinkFactoryContext( - catalogTable, config1, Thread.currentThread().getContextClassLoader()); + TableSinkFactoryContext.replacePlaceholderAndCreate( + catalogTable, + config1, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink1 = (JdbcSink) new JdbcSinkFactory().createSink(context1).createSink(); Properties connectionProperties1 = getSinkProperties(jdbcSink1); Assertions.assertEquals(connectionProperties1.get("rewriteBatchedStatements"), "true"); @@ -490,8 +494,11 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound map2.put("url", getUrl() + "?rewriteBatchedStatements=false"); ReadonlyConfig config2 = ReadonlyConfig.fromMap(map2); TableSinkFactoryContext context2 = - new TableSinkFactoryContext( - catalogTable, config2, Thread.currentThread().getContextClassLoader()); + TableSinkFactoryContext.replacePlaceholderAndCreate( + catalogTable, + config2, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink2 = (JdbcSink) new JdbcSinkFactory().createSink(context2).createSink(); Properties connectionProperties2 = getSinkProperties(jdbcSink2); Assertions.assertEquals(connectionProperties2.get("rewriteBatchedStatements"), "false"); @@ -504,8 +511,11 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound map3.put("url", getUrl()); ReadonlyConfig config3 = ReadonlyConfig.fromMap(map3); TableSinkFactoryContext context3 = - new TableSinkFactoryContext( - catalogTable, config3, Thread.currentThread().getContextClassLoader()); + TableSinkFactoryContext.replacePlaceholderAndCreate( + catalogTable, + config3, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink3 = (JdbcSink) new JdbcSinkFactory().createSink(context3).createSink(); Properties connectionProperties3 = getSinkProperties(jdbcSink3); Assertions.assertEquals(connectionProperties3.get("rewriteBatchedStatements"), "false"); @@ -519,8 +529,11 @@ void defaultSinkParametersTest() throws IOException, SQLException, ClassNotFound map4.put("url", getUrl() + "?useSSL=false&rewriteBatchedStatements=true"); ReadonlyConfig config4 = ReadonlyConfig.fromMap(map4); TableSinkFactoryContext context4 = - new TableSinkFactoryContext( - catalogTable, config4, Thread.currentThread().getContextClassLoader()); + TableSinkFactoryContext.replacePlaceholderAndCreate( + catalogTable, + config4, + Thread.currentThread().getContextClassLoader(), + Collections.emptyList()); JdbcSink jdbcSink4 = (JdbcSink) new JdbcSinkFactory().createSink(context4).createSink(); Properties connectionProperties4 = getSinkProperties(jdbcSink4); Assertions.assertEquals(connectionProperties4.get("useSSL"), "true"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.conf index 4a96aff71bb..0a75209b026 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.conf @@ -54,6 +54,7 @@ sink { password = "Abc!@#135_seatunnel" database = "sink" + table = "${table_name}" generate_sink_sql = true } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql index 8fb483defce..a9b02e2ae3a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_with_multiple_tables.sql @@ -56,6 +56,7 @@ CREATE TABLE sink_table WITH ( 'password' = 'Abc!@#135_seatunnel', 'generate_sink_sql' = 'true', 'database' = 'sink' + 'table' = '${table_name}' ); -- If it's multi-table synchronization, there's no need to set select columns. diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java index 783b0416ba7..a536cf02318 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java @@ -361,7 +361,7 @@ public void testCatalog() { "root", PASSWORD, String.format(URL, starRocksServer.getHost()), - "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n \"storage_format\" = \"DEFAULT\" \n )"); + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n \"storage_format\" = \"DEFAULT\" \n )"); starRocksCatalog.open(); CatalogTable catalogTable = starRocksCatalog.getTable(tablePathStarRocksSource); // sink tableExists ? diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf index 91f7b0402db..ca47a8eb08c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-thrift-to-starrocks-streamload.conf @@ -69,7 +69,7 @@ sink { } "schema_save_mode"="RECREATE_SCHEMA" "data_save_mode"="APPEND_DATA" - save_mode_create_template = "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n \"storage_format\" = \"DEFAULT\" \n )" + save_mode_create_template = "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" , \n \"storage_format\" = \"DEFAULT\" \n )" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 1ab973652f9..1c5b9fe398c 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -28,6 +28,8 @@ import com.google.auto.service.AutoService; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; + @AutoService(Factory.class) public class InMemorySinkFactory implements TableSinkFactory< @@ -43,6 +45,10 @@ public class InMemorySinkFactory public static final Option THROW_EXCEPTION_OF_COMMITTER = Options.key("throw_exception_of_committer").booleanType().defaultValue(false); + public static final Option ASSERT_OPTIONS_KEY = + Options.key("assert_options_key").stringType().noDefaultValue(); + public static final Option ASSERT_OPTIONS_VALUE = + Options.key("assert_options_value").stringType().noDefaultValue(); @Override public String factoryIdentifier() { @@ -56,13 +62,23 @@ public OptionRule optionRule() { THROW_EXCEPTION, THROW_OUT_OF_MEMORY, CHECKPOINT_SLEEP, - THROW_EXCEPTION_OF_COMMITTER) + THROW_EXCEPTION_OF_COMMITTER, + ASSERT_OPTIONS_KEY, + ASSERT_OPTIONS_VALUE) .build(); } @Override public TableSink createSink(TableSinkFactoryContext context) { + if (context.getOptions().getOptional(ASSERT_OPTIONS_KEY).isPresent()) { + String key = context.getOptions().get(ASSERT_OPTIONS_KEY); + String value = context.getOptions().get(ASSERT_OPTIONS_VALUE); + checkArgument( + key.equals(value), + String.format( + "assert key and value not match! key = %s, value = %s", key, value)); + } return () -> new InMemorySink(context.getCatalogTable(), context.getOptions()); } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java new file mode 100644 index 00000000000..eee3705452f --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SinkPlaceholderIT.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class SinkPlaceholderIT extends SeaTunnelContainer { + + @Test + public void testSinkPlaceholder() throws IOException, InterruptedException { + Container.ExecResult execResult = + executeSeaTunnelJob("/fake_to_inmemory_with_sink_placeholder.conf"); + Assertions.assertNotEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_inmemory_with_sink_placeholder.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_inmemory_with_sink_placeholder.conf new file mode 100644 index 00000000000..5263d4492cf --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake_to_inmemory_with_sink_placeholder.conf @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "test_db1.test_schema1.test_table1" + columns = [ + { + name = id + type = bigint + } + { + name = name + type = string + } + { + name = age + type = int + } + ] + primaryKey = { + name = "primary key" + columnNames = ["id", "name"] + } + constraintKeys = [ + { + constraintName = "unique_name" + constraintType = UNIQUE_KEY + constraintColumns = [ + { + columnName = "id" + sortType = ASC + }, + { + columnName = "name" + sortType = ASC + } + ] + } + ] + } + } + ] + } +} + +sink { + InMemory { + assert_options_key = "database=${database_name}, schema=${schema_name}, schema_full_name=${schema_full_name}, table=${table_name}, table_full_name=${table_full_name}, primary_key=${primary_key}, unique_key=${unique_key}, field_names=${field_names}" + assert_options_value = "database=test_db1, schema=test_schema1, schema_full_name=test_db1.test_schema1, table=test_table1, table_full_name=test_db1.test_schema1.test_table1, primary_key=id,name, unique_key=id,name, field_names=id,name,age" + } +} \ No newline at end of file