diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/pom.xml
new file mode 100644
index 00000000000..feebe15be14
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/pom.xml
@@ -0,0 +1,108 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ sort-connectors-v1.15
+ 1.13.0-SNAPSHOT
+
+
+ sort-connector-jdbc-v1.15
+ Apache InLong - Sort-connector-jdbc
+
+ ${project.parent.parent.parent.parent.parent.basedir}
+
+
+
+
+ org.apache.inlong
+ sort-flink-dependencies-v1.15
+ ${project.version}
+ provided
+
+
+ org.apache.inlong
+ sort-connector-base
+ ${project.version}
+
+
+ org.apache.inlong
+ audit-sdk
+ ${project.version}
+
+
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+
+
+
+ org.apache.flink
+ flink-connector-jdbc
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+
+ shade
+
+ package
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+ org.apache.inlong:sort-connector-*
+
+ org/apache/inlong/**
+ META-INF/services/org.apache.flink.table.factories.Factory
+ META-INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory
+
+
+
+
+
+
+
+
+
+
+
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
new file mode 100644
index 00000000000..61fb53217d7
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/clickhouse/ClickHouseRowConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter.clickhouse;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for ClickHouse.
+ */
+public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public ClickHouseRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public String converterName() {
+ return "ClickHouse";
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectFactory.java
new file mode 100644
index 00000000000..36ae8e207ce
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+
+/**
+ * A factory to create a specific {@link JdbcDialect}. This factory is used with Java's Service
+ * Provider Interfaces (SPI) for discovering.
+ *
+ *
Classes that implement this interface can be added to the
+ * "META_INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory" file of a JAR file
+ * in the current classpath to be found.
+ *
+ * @see JdbcDialect
+ * copy from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * not modified
+ */
+@PublicEvolving
+public interface JdbcDialectFactory {
+
+ /**
+ * Retrieves whether the dialect thinks that it can open a connection to the given URL.
+ * Typically, dialects will return true
if they understand the sub-protocol
+ * specified in the URL and false
if they do not.
+ *
+ * @param url the URL of the database
+ * @return true
if this dialect understands the given URL; false
+ * otherwise.
+ */
+ boolean acceptsURL(String url);
+
+ /** @return Creates a new instance of the {@link JdbcDialect}. */
+ JdbcDialect create();
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectLoader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectLoader.java
new file mode 100644
index 00000000000..42eb3f1b2ac
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/JdbcDialectLoader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+/** Utility for working with {@link JdbcDialect}.
+ * copy from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * not modified
+ * */
+@Internal
+public final class JdbcDialectLoader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectLoader.class);
+
+ private JdbcDialectLoader() {
+ }
+
+ /**
+ * Loads the unique JDBC Dialect that can handle the given database url.
+ *
+ * @param url A database URL.
+ * @throws IllegalStateException if the loader cannot find exactly one dialect that can
+ * unambiguously process the given database URL.
+ * @return The loaded dialect.
+ */
+ public static JdbcDialect load(String url) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+ List foundFactories = discoverFactories(cl);
+
+ if (foundFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factories that implement '%s' in the classpath.",
+ JdbcDialectFactory.class.getName()));
+ }
+
+ final List matchingFactories =
+ foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n"
+ + "Available factories are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ foundFactories.stream()
+ .map(f -> f.getClass().getName())
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new IllegalStateException(
+ String.format(
+ "Multiple jdbc dialect factories can handle url '%s' that implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ url,
+ JdbcDialectFactory.class.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return matchingFactories.get(0).create();
+ }
+
+ private static List discoverFactories(ClassLoader classLoader) {
+ try {
+ final List result = new LinkedList<>();
+ ServiceLoader.load(JdbcDialectFactory.class, classLoader)
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for jdbc dialects factory.", e);
+ throw new RuntimeException(
+ "Could not load service provider for jdbc dialects factory.", e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialect.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialect.java
new file mode 100644
index 00000000000..738e2aaad1e
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialect.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.clickhouse;
+
+import org.apache.inlong.sort.jdbc.converter.clickhouse.ClickHouseRowConverter;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * JDBC dialect for ClickHouse SQL.
+ */
+public class ClickHouseDialect extends AbstractDialect {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClickHouseDialect.class);
+
+ // Define MAX/MIN precision of TIMESTAMP type according to ClickHouse docs:
+ // https://clickhouse.com/docs/zh/sql-reference/data-types/datetime64
+ private static final int MAX_TIMESTAMP_PRECISION = 8;
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+
+ // Define MAX/MIN precision of DECIMAL type according to ClickHouse docs:
+ // https://clickhouse.com/docs/zh/sql-reference/data-types/decimal/
+ private static final int MAX_DECIMAL_PRECISION = 128;
+ private static final int MIN_DECIMAL_PRECISION = 32;
+ private static final String POINT = ".";
+
+ @Override
+ public String dialectName() {
+ return "ClickHouse";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new ClickHouseRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ @Override
+ public Optional getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+ }
+
+ /**
+ * Defines the supported types for the dialect.
+ *
+ * @return a set of logical type roots.
+ */
+ @Override
+ public Set supportedTypes() {
+ return new HashSet() {
+
+ {
+ add(LogicalTypeRoot.CHAR);
+ add(LogicalTypeRoot.VARCHAR);
+ add(LogicalTypeRoot.BOOLEAN);
+ add(LogicalTypeRoot.DECIMAL);
+ add(LogicalTypeRoot.TINYINT);
+ add(LogicalTypeRoot.SMALLINT);
+ add(LogicalTypeRoot.INTEGER);
+ add(LogicalTypeRoot.BIGINT);
+ add(LogicalTypeRoot.FLOAT);
+ add(LogicalTypeRoot.DOUBLE);
+ add(LogicalTypeRoot.DATE);
+ add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+ add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+ }
+ };
+
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ /**
+ * Get update one row statement by condition fields
+ */
+ @Override
+ public String getUpdateStatement(
+ String tableName, String[] fieldNames, String[] conditionFields) {
+ List conditionFieldList = Arrays.asList(conditionFields);
+ String setClause =
+ Arrays.stream(fieldNames)
+ .filter(fieldName -> !conditionFieldList.contains(fieldName))
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(", "));
+
+ String conditionClause =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair databaseAndTableName = getDatabaseAndTableName(tableName);
+ return "ALTER TABLE "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + " UPDATE "
+ + setClause
+ + " WHERE "
+ + conditionClause;
+ }
+
+ /**
+ * ClickHouse throw exception "Table default.test_user doesn't exist". But jdbc-url have database name.
+ * So we specify database when exec query. This method parse tableName to database and table.
+ * @param tableName include database.table
+ * @return pair left is database, right is table
+ */
+ private Pair getDatabaseAndTableName(String tableName) {
+ String databaseName = "default";
+ if (tableName.contains(POINT)) {
+ String[] tableNameArray = tableName.split("\\.");
+ databaseName = tableNameArray[0];
+ tableName = tableNameArray[1];
+ } else {
+ LOG.warn("TableName doesn't include database name, so using default as database name");
+ }
+ return Pair.of(databaseName, tableName);
+ }
+
+ /**
+ * Get delete one row statement by condition fields
+ */
+ @Override
+ public String getDeleteStatement(String tableName, String[] conditionFields) {
+ String conditionClause =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair databaseAndTableName = getDatabaseAndTableName(tableName);
+ return "ALTER TABLE "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + " DELETE WHERE " + conditionClause;
+ }
+
+ @Override
+ public String getInsertIntoStatement(String tableName, String[] fieldNames) {
+ String columns =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String placeholders =
+ Arrays.stream(fieldNames).map(f -> ":" + f).collect(Collectors.joining(", "));
+ Pair databaseAndTableName = getDatabaseAndTableName(tableName);
+ return "INSERT INTO "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + "("
+ + columns
+ + ")"
+ + " VALUES ("
+ + placeholders
+ + ")";
+ }
+
+ @Override
+ public String getSelectFromStatement(
+ String tableName, String[] selectFields, String[] conditionFields) {
+ String selectExpressions =
+ Arrays.stream(selectFields)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+ String fieldExpressions =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair databaseAndTableName = getDatabaseAndTableName(tableName);
+ return "SELECT "
+ + selectExpressions
+ + " FROM "
+ + quoteIdentifier(databaseAndTableName.getLeft())
+ + POINT
+ + quoteIdentifier(databaseAndTableName.getRight())
+ + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+ }
+
+ @Override
+ public String getRowExistsStatement(String tableName, String[] conditionFields) {
+ String fieldExpressions =
+ Arrays.stream(conditionFields)
+ .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+ .collect(Collectors.joining(" AND "));
+ Pair pair = getDatabaseAndTableName(tableName);
+ return "SELECT 1 FROM "
+ + quoteIdentifier(pair.getLeft())
+ + POINT
+ + quoteIdentifier(pair.getRight())
+ + " WHERE " + fieldExpressions;
+ }
+
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialectFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialectFactory.java
new file mode 100644
index 00000000000..2e4e4f110a3
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/clickhouse/ClickHouseDialectFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect.clickhouse;
+
+import org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+
+public class ClickHouseDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:clickhouse:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new ClickHouseDialect();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
new file mode 100644
index 00000000000..fa49268f9b8
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+/**
+ * A generic SinkFunction for JDBC.
+ * Modify from {@link org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction}
+ * */
+@Internal
+public class GenericJdbcSinkFunction extends RichSinkFunction
+ implements
+ CheckpointedFunction,
+ InputTypeConfigurable {
+
+ private final JdbcOutputFormat outputFormat;
+
+ public GenericJdbcSinkFunction(@Nonnull JdbcOutputFormat outputFormat) {
+ this.outputFormat = Preconditions.checkNotNull(outputFormat);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ RuntimeContext ctx = getRuntimeContext();
+ outputFormat.setRuntimeContext(ctx);
+ outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void invoke(T value, Context context) throws IOException {
+ outputFormat.writeRecord(value);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) {
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ outputFormat.flush();
+ }
+
+ @Override
+ public void close() {
+ outputFormat.close();
+ }
+
+ @Override
+ public void setInputType(TypeInformation> type, ExecutionConfig executionConfig) {
+ outputFormat.setInputType(type, executionConfig);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
new file mode 100644
index 00000000000..b953d42f135
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.CalculateObjectSizeUtils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.utils.JdbcUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.function.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * A JDBC outputFormat that supports batching records before writing records to database.
+ * Modify from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+@Internal
+public class JdbcOutputFormat>
+ extends
+ RichOutputFormat
+ implements
+ Flushable,
+ InputTypeConfigurable {
+
+ protected final JdbcConnectionProvider connectionProvider;
+ @Nullable
+ private TypeSerializer serializer;
+
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private final String auditKeys;
+ private SinkMetricData sinkMetricData;
+ private Long rowCount = 0L;
+ private Long dataSize = 0L;
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setInputType(TypeInformation> type, ExecutionConfig executionConfig) {
+ if (executionConfig.isObjectReuseEnabled()) {
+ this.serializer = (TypeSerializer) type.createSerializer(executionConfig);
+ }
+ }
+
+ /**
+ * An interface to extract a value from given argument.
+ *
+ * @param The type of given argument
+ * @param The type of the return value
+ */
+ public interface RecordExtractor extends Function, Serializable {
+
+ static RecordExtractor identity() {
+ return x -> x;
+ }
+ }
+
+ /**
+ * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+ *
+ * @param The type of instance.
+ */
+ public interface StatementExecutorFactory>
+ extends
+ SerializableFunction {
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
+
+ private final JdbcExecutionOptions executionOptions;
+ private final StatementExecutorFactory statementExecutorFactory;
+ private final RecordExtractor jdbcRecordExtractor;
+
+ private transient JdbcExec jdbcStatementExecutor;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture> scheduledFuture;
+ private transient volatile Exception flushException;
+
+ public JdbcOutputFormat(
+ @Nonnull JdbcConnectionProvider connectionProvider,
+ @Nonnull JdbcExecutionOptions executionOptions,
+ @Nonnull StatementExecutorFactory statementExecutorFactory,
+ @Nonnull RecordExtractor recordExtractor,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this.connectionProvider = checkNotNull(connectionProvider);
+ this.executionOptions = checkNotNull(executionOptions);
+ this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+ this.jdbcRecordExtractor = checkNotNull(recordExtractor);
+ // audit
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ connectionProvider.getOrEstablishConnection();
+ } catch (Exception e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+
+ // audit
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+ }
+ jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+ if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (JdbcOutputFormat.this) {
+ if (!closed) {
+ try {
+ flush();
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ },
+ executionOptions.getBatchIntervalMs(),
+ executionOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private JdbcExec createAndOpenStatementExecutor(
+ StatementExecutorFactory statementExecutorFactory) throws IOException {
+ JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+ try {
+ exec.prepareStatements(connectionProvider.getConnection());
+ } catch (SQLException e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+ return exec;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to JDBC failed.", flushException);
+ }
+ }
+
+ @Override
+ public final synchronized void writeRecord(In record) throws IOException {
+ checkFlushException();
+ updateMetric(record);
+
+ try {
+ In recordCopy = copyIfNecessary(record);
+ addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
+ batchCount++;
+ if (executionOptions.getBatchSize() > 0
+ && batchCount >= executionOptions.getBatchSize()) {
+ flush();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowCount, dataSize);
+ rowCount = 0L;
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Writing records to JDBC failed.", e);
+ }
+ }
+ private void updateMetric(In record) {
+ rowCount++;
+ dataSize += CalculateObjectSizeUtils.getDataSize(record);
+ }
+ private In copyIfNecessary(In record) {
+ return serializer == null ? record : serializer.copy(record);
+ }
+
+ protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
+ jdbcStatementExecutor.addToBatch(extracted);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (batchCount == 0) {
+ return;
+ }
+ checkFlushException();
+
+ for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
+ try {
+ attemptFlush();
+ batchCount = 0;
+ break;
+ } catch (SQLException e) {
+ LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+ if (i >= executionOptions.getMaxRetries()) {
+ throw new IOException(e);
+ }
+ try {
+ if (!connectionProvider.isConnectionValid()) {
+ updateExecutor(true);
+ }
+ } catch (Exception exception) {
+ LOG.error(
+ "JDBC connection is not valid, and reestablish connection failed.",
+ exception);
+ throw new IOException("Reestablish JDBC connection failed", exception);
+ }
+ try {
+ Thread.sleep(1000 * i);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to flush; interrupted while doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ protected void attemptFlush() throws SQLException {
+ jdbcStatementExecutor.executeBatch();
+ }
+
+ /** Executes prepared statement and closes all resources of this instance. */
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ } catch (Exception e) {
+ LOG.warn("Writing records to JDBC failed.", e);
+ throw new RuntimeException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ // audit, flush metrics data
+ if (sinkMetricData != null) {
+ sinkMetricData.flushAuditData();
+ }
+
+ try {
+ if (jdbcStatementExecutor != null) {
+ jdbcStatementExecutor.closeStatements();
+ }
+ } catch (SQLException e) {
+ LOG.warn("Close JDBC writer failed.", e);
+ }
+ }
+ connectionProvider.closeConnection();
+ checkFlushException();
+ }
+
+ static JdbcBatchStatementExecutor createSimpleRowExecutor(
+ String sql, int[] fieldTypes, boolean objectReuse) {
+ return JdbcBatchStatementExecutor.simple(
+ sql,
+ createRowJdbcStatementBuilder(fieldTypes),
+ objectReuse ? Row::copy : Function.identity());
+ }
+
+ /**
+ * Creates a {@link JdbcStatementBuilder} for {@link Row} using the provided SQL types array.
+ * Uses {@link JdbcUtils#setRecordToStatement}
+ */
+ static JdbcStatementBuilder createRowJdbcStatementBuilder(int[] types) {
+ return (st, record) -> setRecordToStatement(st, types, record);
+ }
+
+ public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException {
+ jdbcStatementExecutor.closeStatements();
+ jdbcStatementExecutor.prepareStatements(
+ reconnect
+ ? connectionProvider.reestablishConnection()
+ : connectionProvider.getConnection());
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
new file mode 100644
index 00000000000..2d4e27fafcb
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.InsertOrUpdateJdbcExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.function.Function;
+
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getPrimaryKey;
+import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
+import static org.apache.flink.util.Preconditions.checkArgument;
+/**
+ * Upsert jdbc output format.
+ * Modify from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
+ * */
+class TableJdbcUpsertOutputFormat
+ extends
+ JdbcOutputFormat, Row, JdbcBatchStatementExecutor> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableJdbcUpsertOutputFormat.class);
+
+ private JdbcBatchStatementExecutor deleteExecutor;
+ private final StatementExecutorFactory> deleteStatementExecutorFactory;
+
+ TableJdbcUpsertOutputFormat(
+ JdbcConnectionProvider connectionProvider,
+ JdbcDmlOptions dmlOptions,
+ JdbcExecutionOptions batchOptions,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this(
+ connectionProvider,
+ batchOptions,
+ ctx -> createUpsertRowExecutor(dmlOptions, ctx),
+ ctx -> createDeleteExecutor(dmlOptions, ctx),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+
+ @VisibleForTesting
+ TableJdbcUpsertOutputFormat(
+ JdbcConnectionProvider connectionProvider,
+ JdbcExecutionOptions batchOptions,
+ StatementExecutorFactory> statementExecutorFactory,
+ StatementExecutorFactory> deleteStatementExecutorFactory,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ super(connectionProvider, batchOptions, statementExecutorFactory, tuple2 -> tuple2.f1,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ this.deleteStatementExecutorFactory = deleteStatementExecutorFactory;
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+ deleteExecutor = deleteStatementExecutorFactory.apply(getRuntimeContext());
+ try {
+ deleteExecutor.prepareStatements(connectionProvider.getConnection());
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static JdbcBatchStatementExecutor createDeleteExecutor(
+ JdbcDmlOptions dmlOptions, RuntimeContext ctx) {
+ int[] pkFields =
+ Arrays.stream(dmlOptions.getFieldNames())
+ .mapToInt(Arrays.asList(dmlOptions.getFieldNames())::indexOf)
+ .toArray();
+ int[] pkTypes =
+ dmlOptions.getFieldTypes() == null
+ ? null
+ : Arrays.stream(pkFields).map(f -> dmlOptions.getFieldTypes()[f]).toArray();
+ String deleteSql =
+ FieldNamedPreparedStatementImpl.parseNamedStatement(
+ dmlOptions
+ .getDialect()
+ .getDeleteStatement(
+ dmlOptions.getTableName(), dmlOptions.getFieldNames()),
+ new HashMap<>());
+ return createKeyedRowExecutor(pkFields, pkTypes, deleteSql);
+ }
+
+ @Override
+ protected void addToBatch(Tuple2 original, Row extracted) throws SQLException {
+ if (original.f0) {
+ super.addToBatch(original, extracted);
+ } else {
+ deleteExecutor.addToBatch(extracted);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ try {
+ super.close();
+ } finally {
+ try {
+ if (deleteExecutor != null) {
+ deleteExecutor.closeStatements();
+ }
+ } catch (SQLException e) {
+ LOG.warn("unable to close delete statement runner", e);
+ }
+ }
+ }
+
+ @Override
+ protected void attemptFlush() throws SQLException {
+ super.attemptFlush();
+ deleteExecutor.executeBatch();
+ }
+
+ @Override
+ public void updateExecutor(boolean reconnect) throws SQLException, ClassNotFoundException {
+ super.updateExecutor(reconnect);
+ deleteExecutor.closeStatements();
+ deleteExecutor.prepareStatements(connectionProvider.getConnection());
+ }
+
+ private static JdbcBatchStatementExecutor createKeyedRowExecutor(
+ int[] pkFields, int[] pkTypes, String sql) {
+ return JdbcBatchStatementExecutor.keyed(
+ sql,
+ createRowKeyExtractor(pkFields),
+ (st, record) -> setRecordToStatement(
+ st, pkTypes, createRowKeyExtractor(pkFields).apply(record)));
+ }
+
+ private static JdbcBatchStatementExecutor createUpsertRowExecutor(
+ JdbcDmlOptions opt, RuntimeContext ctx) {
+ checkArgument(opt.getKeyFields().isPresent());
+
+ int[] pkFields =
+ Arrays.stream(opt.getKeyFields().get())
+ .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+ .toArray();
+ int[] pkTypes =
+ opt.getFieldTypes() == null
+ ? null
+ : Arrays.stream(pkFields).map(f -> opt.getFieldTypes()[f]).toArray();
+
+ return opt.getDialect()
+ .getUpsertStatement(
+ opt.getTableName(), opt.getFieldNames(), opt.getKeyFields().get())
+ .map(
+ sql -> createSimpleRowExecutor(
+ parseNamedStatement(sql),
+ opt.getFieldTypes(),
+ ctx.getExecutionConfig().isObjectReuseEnabled()))
+ .orElseGet(
+ () -> new InsertOrUpdateJdbcExecutor<>(
+ parseNamedStatement(
+ opt.getDialect()
+ .getRowExistsStatement(
+ opt.getTableName(),
+ opt.getKeyFields().get())),
+ parseNamedStatement(
+ opt.getDialect()
+ .getInsertIntoStatement(
+ opt.getTableName(),
+ opt.getFieldNames())),
+ parseNamedStatement(
+ opt.getDialect()
+ .getUpdateStatement(
+ opt.getTableName(),
+ opt.getFieldNames(),
+ opt.getKeyFields().get())),
+ createRowJdbcStatementBuilder(pkTypes),
+ createRowJdbcStatementBuilder(opt.getFieldTypes()),
+ createRowJdbcStatementBuilder(opt.getFieldTypes()),
+ createRowKeyExtractor(pkFields),
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? Row::copy
+ : Function.identity()));
+ }
+
+ private static String parseNamedStatement(String statement) {
+ return FieldNamedPreparedStatementImpl.parseNamedStatement(statement, new HashMap<>());
+ }
+
+ private static Function createRowKeyExtractor(int[] pkFields) {
+ return row -> getPrimaryKey(row, pkFields);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
new file mode 100644
index 00000000000..4efc84e3be7
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.dialect.JdbcDialectLoader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
+import org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.*;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.inlong.sort.base.Constants.*;
+
+/**
+ * Factory for creating configured instances of {@link JdbcDynamicTableSource} and {@link
+ * JdbcDynamicTableSink}.
+ * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory}
+ */
+@Internal
+public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final ConfigOption DIALECT_IMPL =
+ ConfigOptions.key("dialect-impl")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The JDBC Custom Dialect.");
+
+ public static final String IDENTIFIER = "jdbc-inlong";
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config);
+ validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL));
+ JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
+
+ // inlong audit
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = config.getOptional(INLONG_AUDIT).orElse(null);
+ String auditKeys = config.getOptional(AUDIT_KEYS).orElse(null);
+
+ return new JdbcDynamicTableSink(
+ jdbcOptions,
+ getJdbcExecutionOptions(config),
+ getJdbcDmlOptions(
+ jdbcOptions,
+ context.getPhysicalRowDataType(),
+ context.getPrimaryKeyIndexes()),
+ context.getPhysicalRowDataType(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig config = helper.getOptions();
+
+ helper.validate();
+ validateConfigOptions(config);
+ validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL));
+ return new JdbcDynamicTableSource(
+ getJdbcOptions(helper.getOptions()),
+ getJdbcReadOptions(helper.getOptions()),
+ getJdbcLookupOptions(helper.getOptions()),
+ context.getPhysicalRowDataType());
+ }
+
+ private static void validateDataTypeWithJdbcDialect(DataType dataType, String url) {
+ final JdbcDialect dialect = JdbcDialectLoader.load(url);
+ dialect.validate((RowType) dataType.getLogicalType());
+ }
+
+ private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) {
+ final String url = readableConfig.get(URL);
+ final JdbcConnectorOptions.Builder builder =
+ JdbcConnectorOptions.builder()
+ .setDBUrl(url)
+ .setTableName(readableConfig.get(TABLE_NAME))
+ .setDialect(JdbcDialectLoader.load(url))
+ .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
+ .setConnectionCheckTimeoutSeconds(
+ (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
+
+ readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
+ readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+ readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+ return builder.build();
+ }
+
+ private JdbcReadOptions getJdbcReadOptions(ReadableConfig readableConfig) {
+ final Optional partitionColumnName =
+ readableConfig.getOptional(SCAN_PARTITION_COLUMN);
+ final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+ if (partitionColumnName.isPresent()) {
+ builder.setPartitionColumnName(partitionColumnName.get());
+ builder.setPartitionLowerBound(readableConfig.get(SCAN_PARTITION_LOWER_BOUND));
+ builder.setPartitionUpperBound(readableConfig.get(SCAN_PARTITION_UPPER_BOUND));
+ builder.setNumPartitions(readableConfig.get(SCAN_PARTITION_NUM));
+ }
+ readableConfig.getOptional(SCAN_FETCH_SIZE).ifPresent(builder::setFetchSize);
+ builder.setAutoCommit(readableConfig.get(SCAN_AUTO_COMMIT));
+ return builder.build();
+ }
+
+ private JdbcLookupOptions getJdbcLookupOptions(ReadableConfig readableConfig) {
+ return new JdbcLookupOptions(
+ readableConfig.get(LOOKUP_CACHE_MAX_ROWS),
+ readableConfig.get(LOOKUP_CACHE_TTL).toMillis(),
+ readableConfig.get(LOOKUP_MAX_RETRIES),
+ readableConfig.get(LOOKUP_CACHE_MISSING_KEY));
+ }
+
+ private JdbcExecutionOptions getJdbcExecutionOptions(ReadableConfig config) {
+ final JdbcExecutionOptions.Builder builder = new JdbcExecutionOptions.Builder();
+ builder.withBatchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+ builder.withBatchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.withMaxRetries(config.get(SINK_MAX_RETRIES));
+ return builder.build();
+ }
+
+ private JdbcDmlOptions getJdbcDmlOptions(
+ JdbcConnectorOptions jdbcOptions, DataType dataType, int[] primaryKeyIndexes) {
+
+ String[] keyFields =
+ Arrays.stream(primaryKeyIndexes)
+ .mapToObj(i -> DataType.getFieldNames(dataType).get(i))
+ .toArray(String[]::new);
+
+ return JdbcDmlOptions.builder()
+ .withTableName(jdbcOptions.getTableName())
+ .withDialect(jdbcOptions.getDialect())
+ .withFieldNames(DataType.getFieldNames(dataType).toArray(new String[0]))
+ .withKeyFields(keyFields.length > 0 ? keyFields : null)
+ .build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = new HashSet<>();
+ requiredOptions.add(URL);
+ requiredOptions.add(TABLE_NAME);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> optionalOptions = new HashSet<>();
+ optionalOptions.add(DRIVER);
+ optionalOptions.add(USERNAME);
+ optionalOptions.add(PASSWORD);
+ optionalOptions.add(SCAN_PARTITION_COLUMN);
+ optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
+ optionalOptions.add(SCAN_PARTITION_NUM);
+ optionalOptions.add(SCAN_FETCH_SIZE);
+ optionalOptions.add(SCAN_AUTO_COMMIT);
+ optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
+ optionalOptions.add(LOOKUP_CACHE_TTL);
+ optionalOptions.add(LOOKUP_MAX_RETRIES);
+ optionalOptions.add(LOOKUP_CACHE_MISSING_KEY);
+ optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
+ optionalOptions.add(SINK_MAX_RETRIES);
+ optionalOptions.add(SINK_PARALLELISM);
+ optionalOptions.add(MAX_RETRY_TIMEOUT);
+ optionalOptions.add(DIALECT_IMPL);
+ optionalOptions.add(AUDIT_KEYS);
+ optionalOptions.add(INLONG_METRIC);
+ optionalOptions.add(INLONG_AUDIT);
+ return optionalOptions;
+ }
+
+ @Override
+ public Set> forwardOptions() {
+ return Stream.of(
+ URL,
+ TABLE_NAME,
+ USERNAME,
+ PASSWORD,
+ DRIVER,
+ SINK_BUFFER_FLUSH_MAX_ROWS,
+ SINK_BUFFER_FLUSH_INTERVAL,
+ SINK_MAX_RETRIES,
+ MAX_RETRY_TIMEOUT,
+ SCAN_FETCH_SIZE,
+ SCAN_AUTO_COMMIT,
+ LOOKUP_CACHE_MAX_ROWS,
+ LOOKUP_CACHE_TTL,
+ LOOKUP_MAX_RETRIES,
+ LOOKUP_CACHE_MISSING_KEY)
+ .collect(Collectors.toSet());
+ }
+
+ private void validateConfigOptions(ReadableConfig config) {
+ String jdbcUrl = config.get(URL);
+ JdbcDialectLoader.load(jdbcUrl);
+
+ checkAllOrNone(config, new ConfigOption[]{USERNAME, PASSWORD});
+
+ checkAllOrNone(
+ config,
+ new ConfigOption[]{
+ SCAN_PARTITION_COLUMN,
+ SCAN_PARTITION_NUM,
+ SCAN_PARTITION_LOWER_BOUND,
+ SCAN_PARTITION_UPPER_BOUND
+ });
+
+ if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
+ && config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
+ long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
+ long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
+ if (lowerBound > upperBound) {
+ throw new IllegalArgumentException(
+ String.format(
+ "'%s'='%s' must not be larger than '%s'='%s'.",
+ SCAN_PARTITION_LOWER_BOUND.key(),
+ lowerBound,
+ SCAN_PARTITION_UPPER_BOUND.key(),
+ upperBound));
+ }
+ }
+
+ checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
+
+ if (config.get(LOOKUP_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
+ }
+
+ if (config.get(SINK_MAX_RETRIES) < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option shouldn't be negative, but is %s.",
+ SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
+ }
+
+ if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",
+ MAX_RETRY_TIMEOUT.key(),
+ config.get(
+ ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
+ .stringType()
+ .noDefaultValue())));
+ }
+ }
+
+ private void checkAllOrNone(ReadableConfig config, ConfigOption>[] configOptions) {
+ int presentCount = 0;
+ for (ConfigOption configOption : configOptions) {
+ if (config.getOptional(configOption).isPresent()) {
+ presentCount++;
+ }
+ }
+ String[] propertyNames =
+ Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
+ Preconditions.checkArgument(
+ configOptions.length == presentCount || presentCount == 0,
+ "Either all or none of the following options should be provided:\n"
+ + String.join("\n", propertyNames));
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
new file mode 100644
index 00000000000..d216728dfa8
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.GenericJdbcSinkFunction;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkState;
+/**
+ * A {@link DynamicTableSink} for JDBC.
+ * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink}
+ * */
+@Internal
+public class JdbcDynamicTableSink implements DynamicTableSink {
+
+ private final JdbcConnectorOptions jdbcOptions;
+ private final JdbcExecutionOptions executionOptions;
+ private final JdbcDmlOptions dmlOptions;
+ private final DataType physicalRowDataType;
+ private final String dialectName;
+
+ String inlongMetric;
+ String auditHostAndPorts;
+ String auditKeys;
+
+ public JdbcDynamicTableSink(
+ JdbcConnectorOptions jdbcOptions,
+ JdbcExecutionOptions executionOptions,
+ JdbcDmlOptions dmlOptions,
+ DataType physicalRowDataType,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
+ this.jdbcOptions = jdbcOptions;
+ this.executionOptions = executionOptions;
+ this.dmlOptions = dmlOptions;
+ this.physicalRowDataType = physicalRowDataType;
+ this.dialectName = dmlOptions.getDialect().dialectName();
+ // audit
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ validatePrimaryKey(requestedMode);
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.DELETE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .build();
+ }
+
+ private void validatePrimaryKey(ChangelogMode requestedMode) {
+ checkState(
+ ChangelogMode.insertOnly().equals(requestedMode)
+ || dmlOptions.getKeyFields().isPresent(),
+ "please declare primary key for sink table when query contains update/delete record.");
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final TypeInformation rowDataTypeInformation =
+ context.createTypeInformation(physicalRowDataType);
+ final JdbcOutputFormatBuilder builder = new JdbcOutputFormatBuilder();
+
+ builder.setJdbcOptions(jdbcOptions);
+ builder.setJdbcDmlOptions(dmlOptions);
+ builder.setJdbcExecutionOptions(executionOptions);
+ builder.setRowDataTypeInfo(rowDataTypeInformation);
+ builder.setFieldDataTypes(
+ DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]));
+ // audit
+ builder.setInlongMetric(inlongMetric);
+ builder.setAuditHostAndPorts(auditHostAndPorts);
+ builder.setAuditKeys(auditKeys);
+ return SinkFunctionProvider.of(
+ new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new JdbcDynamicTableSink(
+ jdbcOptions, executionOptions, dmlOptions, physicalRowDataType,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "JDBC:" + dialectName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JdbcDynamicTableSink)) {
+ return false;
+ }
+ JdbcDynamicTableSink that = (JdbcDynamicTableSink) o;
+ return Objects.equals(jdbcOptions, that.jdbcOptions)
+ && Objects.equals(executionOptions, that.executionOptions)
+ && Objects.equals(dmlOptions, that.dmlOptions)
+ && Objects.equals(physicalRowDataType, that.physicalRowDataType)
+ && Objects.equals(dialectName, that.dialectName)
+ && Objects.equals(inlongMetric, that.inlongMetric)
+ && Objects.equals(auditHostAndPorts, that.auditHostAndPorts)
+ && Objects.equals(auditKeys, that.auditKeys);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jdbcOptions, executionOptions, dmlOptions, physicalRowDataType, dialectName,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
new file mode 100644
index 00000000000..61eced0fb48
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.table;
+
+import org.apache.inlong.sort.jdbc.internal.JdbcOutputFormat;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableInsertOrUpdateStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.function.Function;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Builder for {@link JdbcOutputFormat} for Table/SQL.
+ * Modify from {@link org.apache.flink.connector.jdbc.table.JdbcOutputFormatBuilder}
+ * */
+public class JdbcOutputFormatBuilder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private JdbcConnectorOptions jdbcOptions;
+ private JdbcExecutionOptions executionOptions;
+ private JdbcDmlOptions dmlOptions;
+ private TypeInformation rowDataTypeInformation;
+ private DataType[] fieldDataTypes;
+
+ // audit
+ String inlongMetric;
+ String auditHostAndPorts;
+ String auditKeys;
+
+ public JdbcOutputFormatBuilder() {
+ }
+
+ public JdbcOutputFormatBuilder setJdbcOptions(JdbcConnectorOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setJdbcExecutionOptions(JdbcExecutionOptions executionOptions) {
+ this.executionOptions = executionOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setJdbcDmlOptions(JdbcDmlOptions dmlOptions) {
+ this.dmlOptions = dmlOptions;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setRowDataTypeInfo(TypeInformation rowDataTypeInfo) {
+ this.rowDataTypeInformation = rowDataTypeInfo;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setFieldDataTypes(DataType[] fieldDataTypes) {
+ this.fieldDataTypes = fieldDataTypes;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setInlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setAuditHostAndPorts(String auditHostAndPorts) {
+ this.auditHostAndPorts = auditHostAndPorts;
+ return this;
+ }
+
+ public JdbcOutputFormatBuilder setAuditKeys(String auditKeys) {
+ this.auditKeys = auditKeys;
+ return this;
+ }
+
+ public JdbcOutputFormat build() {
+ checkNotNull(jdbcOptions, "jdbc options can not be null");
+ checkNotNull(dmlOptions, "jdbc dml options can not be null");
+ checkNotNull(executionOptions, "jdbc execution options can not be null");
+
+ final LogicalType[] logicalTypes =
+ Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+ if (dmlOptions.getKeyFields().isPresent() && dmlOptions.getKeyFields().get().length > 0) {
+ // upsert query
+ return new JdbcOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx -> createBufferReduceExecutor(
+ dmlOptions, ctx, rowDataTypeInformation, logicalTypes),
+ JdbcOutputFormat.RecordExtractor.identity(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ } else {
+ // append only query
+ final String sql =
+ dmlOptions
+ .getDialect()
+ .getInsertIntoStatement(
+ dmlOptions.getTableName(), dmlOptions.getFieldNames());
+ return new JdbcOutputFormat<>(
+ new SimpleJdbcConnectionProvider(jdbcOptions),
+ executionOptions,
+ ctx -> createSimpleBufferedExecutor(
+ ctx,
+ dmlOptions.getDialect(),
+ dmlOptions.getFieldNames(),
+ logicalTypes,
+ sql,
+ rowDataTypeInformation),
+ JdbcOutputFormat.RecordExtractor.identity(),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ }
+ }
+
+ private static JdbcBatchStatementExecutor createBufferReduceExecutor(
+ JdbcDmlOptions opt,
+ RuntimeContext ctx,
+ TypeInformation rowDataTypeInfo,
+ LogicalType[] fieldTypes) {
+ checkArgument(opt.getKeyFields().isPresent());
+ JdbcDialect dialect = opt.getDialect();
+ String tableName = opt.getTableName();
+ String[] pkNames = opt.getKeyFields().get();
+ int[] pkFields =
+ Arrays.stream(pkNames)
+ .mapToInt(Arrays.asList(opt.getFieldNames())::indexOf)
+ .toArray();
+ LogicalType[] pkTypes =
+ Arrays.stream(pkFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+ final TypeSerializer typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ final Function valueTransform =
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity();
+
+ return new TableBufferReducedStatementExecutor(
+ createUpsertRowExecutor(
+ dialect,
+ tableName,
+ opt.getFieldNames(),
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes),
+ createDeleteExecutor(dialect, tableName, pkNames, pkTypes),
+ createRowKeyExtractor(fieldTypes, pkFields),
+ valueTransform);
+ }
+
+ private static JdbcBatchStatementExecutor createSimpleBufferedExecutor(
+ RuntimeContext ctx,
+ JdbcDialect dialect,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ String sql,
+ TypeInformation rowDataTypeInfo) {
+ final TypeSerializer typeSerializer =
+ rowDataTypeInfo.createSerializer(ctx.getExecutionConfig());
+ return new TableBufferedStatementExecutor(
+ createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql),
+ ctx.getExecutionConfig().isObjectReuseEnabled()
+ ? typeSerializer::copy
+ : Function.identity());
+ }
+
+ private static JdbcBatchStatementExecutor createUpsertRowExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ return dialect.getUpsertStatement(tableName, fieldNames, pkNames)
+ .map(sql -> createSimpleRowExecutor(dialect, fieldNames, fieldTypes, sql))
+ .orElseGet(
+ () -> createInsertOrUpdateExecutor(
+ dialect,
+ tableName,
+ fieldNames,
+ fieldTypes,
+ pkFields,
+ pkNames,
+ pkTypes));
+ }
+
+ private static JdbcBatchStatementExecutor createDeleteExecutor(
+ JdbcDialect dialect, String tableName, String[] pkNames, LogicalType[] pkTypes) {
+ String deleteSql = dialect.getDeleteStatement(tableName, pkNames);
+ return createSimpleRowExecutor(dialect, pkNames, pkTypes, deleteSql);
+ }
+
+ private static JdbcBatchStatementExecutor createSimpleRowExecutor(
+ JdbcDialect dialect, String[] fieldNames, LogicalType[] fieldTypes, final String sql) {
+ final JdbcRowConverter rowConverter = dialect.getRowConverter(RowType.of(fieldTypes));
+ return new TableSimpleStatementExecutor(
+ connection -> FieldNamedPreparedStatement.prepareStatement(connection, sql, fieldNames),
+ rowConverter);
+ }
+
+ private static JdbcBatchStatementExecutor createInsertOrUpdateExecutor(
+ JdbcDialect dialect,
+ String tableName,
+ String[] fieldNames,
+ LogicalType[] fieldTypes,
+ int[] pkFields,
+ String[] pkNames,
+ LogicalType[] pkTypes) {
+ final String existStmt = dialect.getRowExistsStatement(tableName, pkNames);
+ final String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
+ final String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, pkNames);
+ return new TableInsertOrUpdateStatementExecutor(
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, existStmt, pkNames),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, insertStmt, fieldNames),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection, updateStmt, fieldNames),
+ dialect.getRowConverter(RowType.of(pkTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ dialect.getRowConverter(RowType.of(fieldTypes)),
+ createRowKeyExtractor(fieldTypes, pkFields));
+ }
+
+ private static Function createRowKeyExtractor(
+ LogicalType[] logicalTypes, int[] pkFields) {
+ final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[pkFields.length];
+ for (int i = 0; i < pkFields.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[pkFields[i]], pkFields[i]);
+ }
+ return row -> getPrimaryKey(row, fieldGetters);
+ }
+
+ private static RowData getPrimaryKey(RowData row, RowData.FieldGetter[] fieldGetters) {
+ GenericRowData pkRow = new GenericRowData(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ pkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ return pkRow;
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..a14e9cc4401
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.jdbc.table.JdbcDynamicTableFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory
new file mode 100644
index 00000000000..0034faa2c1f
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/resources/META-INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.jdbc.dialect.clickhouse.ClickHouseDialectFactory
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
index c753aa538f8..a3efa8b01ea 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pom.xml
@@ -44,6 +44,7 @@
hudi
kafka
redis
+ jdbc
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index b33808eb954..126eebbdc70 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -893,15 +893,6 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
Source : com.ververica:flink-connector-sqlserver-cdc:2.3.0 (Please note that the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
-1.3.27 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
- inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
- inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
- inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
- inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
- inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/table/MySqlReadableMetadata.java
-Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.)
-License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
-
1.3.26 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -915,6 +906,23 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
Source : org.apache.hudi:hudi-flink1.15-bundle:0.12.3 (Please note that the software have been modified.)
License : https://github.com/apache/hudi/blob/master/LICENSE
+1.3.27 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/table/MySqlReadableMetadata.java
+Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.)
+License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
+1.3.28 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableSink.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcOutputFormatBuilder.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcOutputFormat.java
+ inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableJdbcUpsertOutputFormat.java
+Source : org.apache.flink:flink-connector-jdbc-1.15.4.jar (Please note that the software have been modified.)
+License : https://github.com/apache/flink/blob/master/LICENSE
=======================================================================