From 40d4d764c62ace9f716cfa55fecdacc5b0bec7cb Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 2 Jul 2024 00:44:08 +0800 Subject: [PATCH] [Improve][MulitTableSink] Refactor multi-table configurations --- docs/en/concept/connector-v2-features.md | 14 ++ docs/en/connector-v2/sink/Doris.md | 90 +++++++ docs/en/connector-v2/sink/Hive.md | 1 + docs/en/connector-v2/sink/Http.md | 70 ++++++ docs/en/connector-v2/sink/Iceberg.md | 75 ++++++ docs/en/connector-v2/sink/InfluxDB.md | 34 +++ docs/en/connector-v2/sink/Kudu.md | 106 ++++---- docs/en/connector-v2/sink/OssFile.md | 2 +- docs/en/connector-v2/sink/Paimon.md | 47 +++- docs/en/connector-v2/sink/S3File.md | 54 ++-- docs/en/connector-v2/sink/StarRocks.md | 84 +++++++ .../api/configuration/ReadonlyConfig.java | 4 + .../api/sink/SinkReplaceNameConstant.java | 2 + .../seatunnel/api/sink/TablePlaceholder.java | 238 ++++++++++++++++++ .../api/table/factory/FactoryUtil.java | 14 +- .../doris/sink/DorisSinkFactory.java | 20 +- .../sink/ElasticsearchSinkFactory.java | 50 +--- .../BaseMultipleTableFileSinkFactory.java | 66 +---- .../file/local/sink/LocalFileSinkFactory.java | 5 +- .../file/oss/sink/OssFileSinkFactory.java | 5 +- .../file/s3/sink/S3FileSinkFactory.java | 29 +-- .../seatunnel/hive/sink/HiveSinkFactory.java | 53 +--- .../iceberg/sink/IcebergSinkFactory.java | 23 +- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 2 +- .../seatunnel/paimon/config/PaimonConfig.java | 5 +- .../paimon/sink/PaimonSinkFactory.java | 23 +- .../starrocks/sink/StarRocksSinkFactory.java | 49 +--- .../flink/execution/SinkExecuteProcessor.java | 12 +- .../flink/execution/SinkExecuteProcessor.java | 12 +- .../spark/execution/SinkExecuteProcessor.java | 12 +- .../spark/execution/SinkExecuteProcessor.java | 12 +- .../ConnectorSpecificationCheckTest.java | 49 +++- 32 files changed, 853 insertions(+), 409 deletions(-) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java diff --git a/docs/en/concept/connector-v2-features.md b/docs/en/concept/connector-v2-features.md index 7eb3cd48752d..89e81637589a 100644 --- a/docs/en/concept/connector-v2-features.md +++ b/docs/en/concept/connector-v2-features.md @@ -69,3 +69,17 @@ For sink connector, the sink connector supports exactly-once if any piece of dat ### cdc(change data capture) If a sink connector supports writing row kinds(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE) based on primary key, we think it supports cdc(change data capture). + +### support multiple table write + +Supports write multiple tables in one SeaTunnel job. + +requires: +1. Support apis: `SupportMultiTableSink`、`SupportMultiTableSinkWriter` +2. Support the following table identifier placeholders in sink options. +- `${database_name}` Used to get the database in the upstream catalog table +- `${schema_name}` Used to get the schema in the upstream catalog table +- `${table_name}` Used to get the table in the upstream catalog table +- `${primary_key}` Used to get the table primary-key fields in the upstream catalog table +- `${unique_key}` Used to get the table unique-key fields in the upstream catalog table +- `${field_names}` Used to get the table field keys in the upstream catalog table diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 8c6de2977b7c..56b4b62465c7 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -18,6 +18,7 @@ - [x] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -323,6 +324,95 @@ sink { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "${database_name}_test" + table = "${table_name}_test" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Doris { + fenodes = "doris_cdc_e2e:8030" + username = root + password = "" + database = "${schema_name}_test" + table = "${table_name}_test" + sink.label-prefix = "test-cdc" + sink.enable-2pc = "true" + sink.enable-delete = "true" + doris.config { + format = "json" + read_json_by_line = "true" + } + } +} +``` + ## Changelog ### 2.3.0-beta 2022-10-20 diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 023bb38ddb1f..e3c62294ee68 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -15,6 +15,7 @@ If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and h ## Key features +- [x] [support multiple table write](../../concept/connector-v2-features.md) - [x] [exactly-once](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` diff --git a/docs/en/connector-v2/sink/Http.md b/docs/en/connector-v2/sink/Http.md index 1eb89af0d00d..59f80514cbde 100644 --- a/docs/en/connector-v2/sink/Http.md +++ b/docs/en/connector-v2/sink/Http.md @@ -12,6 +12,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -56,6 +57,75 @@ Http { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Http { + ... + url = "http://localhost/test/${database_name}_test/${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Http { + ... + url = "http://localhost/test/${schema_name}_test/${table_name}_test" + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/sink/Iceberg.md b/docs/en/connector-v2/sink/Iceberg.md index 3aa24a0a636a..721c5ea7c08b 100644 --- a/docs/en/connector-v2/sink/Iceberg.md +++ b/docs/en/connector-v2/sink/Iceberg.md @@ -16,6 +16,10 @@ Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution. +## Key features + +- [x] [support multiple table write](../../concept/connector-v2-features.md) + ## Supported DataSource Info | Datasource | Dependent | Maven | @@ -173,6 +177,77 @@ sink { ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + Iceberg { + ... + namespace = "${database_name}_test" + table = "${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Iceberg { + ... + namespace = "${schema_name}_test" + table = "${table_name}_test" + } +} +``` + ## Changelog ### 2.3.4-SNAPSHOT 2024-01-18 diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md index 1dba1fbe4dc8..e899840b0fa2 100644 --- a/docs/en/connector-v2/sink/InfluxDB.md +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -9,6 +9,7 @@ Write data to InfluxDB. ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -100,6 +101,39 @@ sink { ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "${table_name}_test" + } +} +``` + ## Changelog ### next version diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md index aa43a72522dd..aea1a917fb19 100644 --- a/docs/en/connector-v2/sink/Kudu.md +++ b/docs/en/connector-v2/sink/Kudu.md @@ -16,6 +16,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Data Type Mapping @@ -123,75 +124,72 @@ sink { } ``` -### Multiple Table +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + kudu{ + kudu_masters = "kudu-master-cdc:7051" + table_name = "${database_name}_${table_name}_test" + } +} +``` + +#### example2 ```hocon env { - # You can set engine configuration here parallelism = 1 job.mode = "BATCH" } source { - FakeSource { - tables_configs = [ - { - schema = { - table = "kudu_sink_1" - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - } - ] - }, - { - schema = { - table = "kudu_sink_2" - fields { - id = int - val_bool = boolean - val_int8 = tinyint - val_int16 = smallint - val_int32 = int - val_int64 = bigint - val_float = float - val_double = double - val_decimal = "decimal(16, 1)" - val_string = string - val_unixtime_micros = timestamp - } - } - rows = [ - { - kind = INSERT - fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] - } - ] + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" } ] } } +transform { +} sink { - kudu{ - kudu_masters = "kudu-master-multiple:7051" - } + kudu{ + kudu_masters = "kudu-master-cdc:7051" + table_name = "${schema_name}_${table_name}_test" + } } ``` diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index aef2bb11c096..f83fdcf49973 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -22,6 +22,7 @@ ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -509,7 +510,6 @@ sink { compress_codec = "lzo" } } - ``` ## Changelog diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index d79d7c9b0044..58978cc20c22 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -27,6 +27,7 @@ libfb303-xxx.jar ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Options @@ -242,6 +243,8 @@ sink { ### Multiple table +#### example1 + ```hocon env { parallelism = 1 @@ -254,6 +257,7 @@ source { base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" username = "root" password = "******" + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] } } @@ -265,8 +269,47 @@ sink { Paimon { catalog_name="seatunnel_test" warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" - database="${database_name}" - table="${table_name}" + database="${database_name}_test" + table="${table_name}_test" + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + Paimon { + catalog_name="seatunnel_test" + warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/" + database="${schema_name}_test" + table="${table_name}_test" } } ``` diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index c25975a86033..cb711f6b3b77 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -12,6 +12,7 @@ - [x] [exactly-once](../../concept/connector-v2-features.md) - [ ] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) By default, we use 2PC commit to ensure `exactly-once` @@ -445,45 +446,34 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden Multi-table writing and saveMode -``` +```hocon env { -"job.name"="SeaTunnel_job" -"job.mode"=STREAMING + "job.name"="SeaTunnel_job" + "job.mode"=STREAMING } source { -MySQL-CDC { - - "connect.max-retries"=3 - "connection.pool.size"=6 - "startup.mode"=INITIAL - "exactly_once"="true" - "stop.mode"=NEVER - parallelism=1 - "result_table_name"=Table11519548644512 - "dag-parsing.mode"=MULTIPLEX - catalog { - factory=Mysql - } - database-names=[ - "wls_t1" - ] - table-names=[ - "wls_t1.mysqlcdc_to_s3_t3", - "wls_t1.mysqlcdc_to_s3_t4", - "wls_t1.mysqlcdc_to_s3_t5", - "wls_t1.mysqlcdc_to_s3_t1", - "wls_t1.mysqlcdc_to_s3_t2" - ] - password="xxxxxx" - username="xxxxxxxxxxxxx" - base-url="jdbc:mysql://localhost:3306/qa_source" - server-time-zone=UTC -} + MySQL-CDC { + database-names=[ + "wls_t1" + ] + table-names=[ + "wls_t1.mysqlcdc_to_s3_t3", + "wls_t1.mysqlcdc_to_s3_t4", + "wls_t1.mysqlcdc_to_s3_t5", + "wls_t1.mysqlcdc_to_s3_t1", + "wls_t1.mysqlcdc_to_s3_t2" + ] + password="xxxxxx" + username="xxxxxxxxxxxxx" + base-url="jdbc:mysql://localhost:3306/qa_source" + } } + transform { } + sink { -S3File { + S3File { bucket = "s3a://seatunnel-test" tmp_path = "/tmp/seatunnel/${table_name}" path="/test/${table_name}" diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index b6dc18e8eab8..c3c14c7a3627 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -12,6 +12,7 @@ - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) +- [x] [support multiple table write](../../concept/connector-v2-features.md) ## Description @@ -283,6 +284,89 @@ sink { } ``` +### Multiple table + +#### example1 + +```hocon +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + Mysql-CDC { + base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel" + username = "root" + password = "******" + + table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"] + } +} + +transform { +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "${database_name}_test" + table = "${table_name}_test" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + +#### example2 + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = oracle.jdbc.driver.OracleDriver + url = "jdbc:oracle:thin:@localhost:1521/XE" + user = testUser + password = testPassword + + table_list = [ + { + table_path = "TESTSCHEMA.TABLE_1" + }, + { + table_path = "TESTSCHEMA.TABLE_2" + } + ] + } +} + +transform { +} + +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "${schema_name}_test" + table = "${table_name}_test" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + ## Changelog ### next version diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java index 0ac179ae7645..485baceb4f9d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java @@ -144,6 +144,10 @@ private Object getValue(String key) { } } + public Map copyData() { + return new LinkedHashMap<>(this.confData); + } + @Override public int hashCode() { int hash = 0; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java index f3bc08b0e1f6..0291c2760cc6 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkReplaceNameConstant.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.sink; +/** @deprecated instead by {@link TablePlaceholder} todo remove this class */ +@Deprecated public final class SinkReplaceNameConstant { public static final String REPLACE_TABLE_NAME_KEY = "${table_name}"; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java new file mode 100644 index 000000000000..548bcd7444a6 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/TablePlaceholder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.sink; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.RequiredOption; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class TablePlaceholder { + // Placeholder ${database_name} or ${database_name:default_value} + public static final String REPLACE_DATABASE_NAME_KEY = "database_name"; + // Placeholder ${schema_name} or ${schema_name:default_value} + public static final String REPLACE_SCHEMA_NAME_KEY = "schema_name"; + // Placeholder ${schema_full_name} or ${schema_full_name:default_value} + public static final String REPLACE_SCHEMA_FULL_NAME_KEY = "schema_full_name"; + // Placeholder ${table_name} or ${table_name:default_value} + public static final String REPLACE_TABLE_NAME_KEY = "table_name"; + // Placeholder ${table_full_name} or ${table_full_name:default_value} + public static final String REPLACE_TABLE_FULL_NAME_KEY = "table_full_name"; + // Placeholder ${primary_key} or ${primary_key:default_value} + public static final String REPLACE_PRIMARY_KEY = "primary_key"; + // Placeholder ${unique_key} or ${unique_key:default_value} + public static final String REPLACE_UNIQUE_KEY = "unique_key"; + // Placeholder ${field_names} or ${field_names:default_value} + public static final String REPLACE_FIELD_NAMES_KEY = "field_names"; + public static final String NAME_DELIMITER = "."; + public static final String FIELD_DELIMITER = ","; + + private static String replacePlaceholders(String input, String placeholderName, String value) { + return replacePlaceholders(input, placeholderName, value, null); + } + + private static String replacePlaceholders( + String input, String placeholderName, String value, String defaultValue) { + String placeholderRegex = "\\$\\{" + Pattern.quote(placeholderName) + "(:[^}]*)?\\}"; + Pattern pattern = Pattern.compile(placeholderRegex); + Matcher matcher = pattern.matcher(input); + + StringBuffer result = new StringBuffer(); + while (matcher.find()) { + String replacement = + value != null && !value.isEmpty() + ? value + : (matcher.group(1) != null + ? matcher.group(1).substring(1) + : defaultValue); + if (replacement == null) { + continue; + } + matcher.appendReplacement(result, Matcher.quoteReplacement(replacement)); + } + matcher.appendTail(result); + return result.toString(); + } + + private static String replaceTableIdentifier( + String placeholder, TableIdentifier identifier, String defaultValue) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_DATABASE_NAME_KEY, + identifier.getDatabaseName(), + defaultValue); + placeholder = + replacePlaceholders( + placeholder, + REPLACE_SCHEMA_NAME_KEY, + identifier.getSchemaName(), + defaultValue); + placeholder = + replacePlaceholders( + placeholder, + REPLACE_TABLE_NAME_KEY, + identifier.getTableName(), + defaultValue); + + List fullPath = new ArrayList<>(); + if (identifier.getDatabaseName() != null) { + fullPath.add(identifier.getDatabaseName()); + } + if (identifier.getSchemaName() != null) { + fullPath.add(identifier.getSchemaName()); + } + if (!fullPath.isEmpty()) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_SCHEMA_FULL_NAME_KEY, + String.join(NAME_DELIMITER, fullPath), + defaultValue); + } + + if (identifier.getTableName() != null) { + fullPath.add(identifier.getTableName()); + } + if (!fullPath.isEmpty()) { + placeholder = + replacePlaceholders( + placeholder, + REPLACE_TABLE_FULL_NAME_KEY, + String.join(NAME_DELIMITER, fullPath), + defaultValue); + } + return placeholder; + } + + public static String replaceTableIdentifier(String placeholder, TableIdentifier identifier) { + return replaceTableIdentifier(placeholder, identifier, ""); + } + + public static String replaceTablePrimaryKey(String placeholder, PrimaryKey primaryKey) { + if (primaryKey != null && !primaryKey.getColumnNames().isEmpty()) { + String pkFieldsString = String.join(FIELD_DELIMITER, primaryKey.getColumnNames()); + return replacePlaceholders(placeholder, REPLACE_PRIMARY_KEY, pkFieldsString); + } + return placeholder; + } + + public static String replaceTableUniqueKey( + String placeholder, List constraintKeys) { + Optional ukFieldsString = + constraintKeys.stream() + .filter( + e -> + e.getConstraintType() + .equals(ConstraintKey.ConstraintType.UNIQUE_KEY)) + .findFirst() + .map( + e -> + e.getColumnNames().stream() + .map(f -> f.getColumnName()) + .collect(Collectors.joining(FIELD_DELIMITER))); + if (ukFieldsString.isPresent()) { + return replacePlaceholders(placeholder, REPLACE_UNIQUE_KEY, ukFieldsString.get()); + } + return placeholder; + } + + public static String replaceTableFieldNames(String placeholder, TableSchema schema) { + return replacePlaceholders( + placeholder, + REPLACE_FIELD_NAMES_KEY, + String.join(FIELD_DELIMITER, schema.getFieldNames())); + } + + public static ReadonlyConfig replaceTablePlaceholder( + ReadonlyConfig config, CatalogTable table, Option... options) { + return replaceTablePlaceholder(config, table, new HashSet<>(Arrays.asList(options))); + } + + public static ReadonlyConfig replaceTablePlaceholder( + ReadonlyConfig config, CatalogTable table, Collection