From 4555ebfe1c959399f8aedd57c136a988a02ed13c Mon Sep 17 00:00:00 2001 From: ZiruiPeng Date: Wed, 29 May 2024 17:00:30 +0800 Subject: [PATCH] [INLONG-10297][Sort] Fix mysql connector cannot submit to flink cluster (#10301) --- .../sort-connectors/mysql-cdc/pom.xml | 1 + .../inlong/sort/mysql/MySqlTableSource.java | 5 +- .../RowDataDebeziumDeserializeSchema.java | 31 ++- .../inlong/sort/mysql/source/MySqlSource.java | 239 +++++++++++++++++ .../sort/mysql/source/MySqlSourceBuilder.java | 246 ++++++++++++++++++ licenses/inlong-sort-connectors/LICENSE | 2 + 6 files changed, 514 insertions(+), 10 deletions(-) create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java create mode 100644 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml index 023491c4108..1e462a47c26 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/pom.xml @@ -60,6 +60,7 @@ mysql mysql-connector-java + compile org.apache.kafka diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java index c244378acaf..b766150c6d1 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java @@ -18,9 +18,8 @@ package org.apache.inlong.sort.mysql; import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.mysql.source.MySqlSource; -import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata; import com.ververica.cdc.connectors.mysql.table.StartupOptions; @@ -182,7 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .setServerTimeZone(serverTimeZone) .setUserDefinedConverterFactory( MySqlDeserializationConverterFactory.instance()) - .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption)) + .setMetricOption(metricOption) .build(); if (enableParallelRead) { MySqlSource parallelSource = diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java index e98a700b623..6a92e919753 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mysql; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceMetricData; @@ -104,7 +105,8 @@ public interface ValueValidator extends Serializable { /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private final SourceMetricData sourceMetricData; + private SourceMetricData sourceMetricData; + private final MetricOption metricOption; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ public static Builder newBuilder() { @@ -119,7 +121,7 @@ public static Builder newBuilder() { ZoneId serverTimeZone, DeserializationRuntimeConverterFactory userDefinedConverterFactory, DebeziumChangelogMode changelogMode, - SourceMetricData sourceMetricData) { + MetricOption metricOption) { this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); this.physicalConverter = @@ -130,7 +132,7 @@ public static Builder newBuilder() { this.resultTypeInfo = checkNotNull(resultTypeInfo); this.validator = checkNotNull(validator); this.changelogMode = checkNotNull(changelogMode); - this.sourceMetricData = sourceMetricData; + this.metricOption = metricOption; } @Override @@ -150,6 +152,9 @@ public void deserialize(SourceRecord record, Collector out) throws Exce GenericRowData delete = extractBeforeRow(value, valueSchema); validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } emit(record, delete, out); } else { if (changelogMode == DebeziumChangelogMode.ALL) { @@ -169,6 +174,17 @@ public void deserialize(SourceRecord record, Collector out) throws Exce } } + /** + * Initialize the source metric data. + * Must be called on flink cluster rather than flink client + * Since the audit operator is not serializable + */ + public void initSourceMetricData() { + if (metricOption != null) { + this.sourceMetricData = new SourceMetricData(metricOption); + } + } + private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); Struct after = value.getStruct(Envelope.FieldName.AFTER); @@ -212,7 +228,7 @@ public static class Builder { private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; - private SourceMetricData sourceMetricData; + private MetricOption metricOption; public Builder setPhysicalRowType(RowType physicalRowType) { this.physicalRowType = physicalRowType; @@ -249,8 +265,9 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { this.changelogMode = changelogMode; return this; } - public Builder setSourceMetricData(SourceMetricData sourceMetricData) { - this.sourceMetricData = sourceMetricData; + + public Builder setMetricOption(MetricOption metricOption) { + this.metricOption = metricOption; return this; } @@ -263,7 +280,7 @@ public RowDataDebeziumDeserializeSchema build() { serverTimeZone, userDefinedConverterFactory, changelogMode, - sourceMetricData); + metricOption); } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java new file mode 100644 index 00000000000..801d1723724 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql.source; + +import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema; + +import com.ververica.cdc.connectors.mysql.MySqlValidator; +import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; +import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner; +import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner; +import com.ververica.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; +import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; +import com.ververica.cdc.connectors.mysql.table.StartupMode; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.jdbc.JdbcConnection; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.function.Supplier; + +import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection; + +/** + * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel + * reading snapshot of table and then continue to capture data change from binlog. + * + *
+ *     1. The source supports parallel capturing table change.
+ *     2. The source supports checkpoint in split level when read snapshot data.
+ *     3. The source doesn't need apply any lock of MySQL.
+ * 
+ * + *
{@code
+ * MySqlSource
+ *     .builder()
+ *     .hostname("localhost")
+ *     .port(3306)
+ *     .databaseList("mydb")
+ *     .tableList("mydb.users")
+ *     .username(username)
+ *     .password(password)
+ *     .serverId(5400)
+ *     .deserializer(new JsonDebeziumDeserializationSchema())
+ *     .build();
+ * }
+ * + * @param the output type of the source. + * + * Copied from com.ververica:flink-connector-mysql-cdc-2.3.0, added with Inlong metrics support + * + */ +@Internal +public class MySqlSource + implements + Source, + ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + private final MySqlSourceConfigFactory configFactory; + private final DebeziumDeserializationSchema deserializationSchema; + + /** + * Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}. + * + * @return a MySql parallel source builder. + */ + @PublicEvolving + public static MySqlSourceBuilder builder() { + return new MySqlSourceBuilder<>(); + } + + MySqlSource( + MySqlSourceConfigFactory configFactory, + DebeziumDeserializationSchema deserializationSchema) { + this.configFactory = configFactory; + this.deserializationSchema = deserializationSchema; + } + + public MySqlSourceConfigFactory getConfigFactory() { + return configFactory; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + // create source config for the given subtask (e.g. unique server id) + MySqlSourceConfig sourceConfig = + configFactory.createConfig(readerContext.getIndexOfSubtask()); + FutureCompletingBlockingQueue> elementsQueue = + new FutureCompletingBlockingQueue<>(); + + final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); + metricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); + + final MySqlSourceReaderMetrics sourceReaderMetrics = + new MySqlSourceReaderMetrics(metricGroup); + sourceReaderMetrics.registerMetrics(); + MySqlSourceReaderContext mySqlSourceReaderContext = + new MySqlSourceReaderContext(readerContext); + Supplier splitReaderSupplier = + () -> new MySqlSplitReader( + sourceConfig, + readerContext.getIndexOfSubtask(), + mySqlSourceReaderContext); + + if (deserializationSchema instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) deserializationSchema).initSourceMetricData(); + } + + return new MySqlSourceReader<>( + elementsQueue, + splitReaderSupplier, + new MySqlRecordEmitter<>( + deserializationSchema, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges()), + readerContext.getConfiguration(), + mySqlSourceReaderContext, + sourceConfig); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) { + MySqlSourceConfig sourceConfig = configFactory.createConfig(0); + + final MySqlValidator validator = new MySqlValidator(sourceConfig); + validator.validate(); + + final MySqlSplitAssigner splitAssigner; + if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { + boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc); + splitAssigner = + new MySqlHybridSplitAssigner( + sourceConfig, + enumContext.currentParallelism(), + new ArrayList<>(), + isTableIdCaseSensitive); + } catch (Exception e) { + throw new FlinkRuntimeException( + "Failed to discover captured tables for enumerator", e); + } + } else { + splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig); + } + + return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, PendingSplitsState checkpoint) { + MySqlSourceConfig sourceConfig = configFactory.createConfig(0); + + final MySqlSplitAssigner splitAssigner; + if (checkpoint instanceof HybridPendingSplitsState) { + splitAssigner = + new MySqlHybridSplitAssigner( + sourceConfig, + enumContext.currentParallelism(), + (HybridPendingSplitsState) checkpoint); + } else if (checkpoint instanceof BinlogPendingSplitsState) { + splitAssigner = + new MySqlBinlogSplitAssigner( + sourceConfig, (BinlogPendingSplitsState) checkpoint); + } else { + throw new UnsupportedOperationException( + "Unsupported restored PendingSplitsState: " + checkpoint); + } + return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return MySqlSplitSerializer.INSTANCE; + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new PendingSplitsStateSerializer(getSplitSerializer()); + } + + @Override + public TypeInformation getProducedType() { + return deserializationSchema.getProducedType(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java new file mode 100644 index 00000000000..775da14035e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql.source; + +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.annotation.PublicEvolving; + +import java.time.Duration; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * + *
{@code
+ * MySqlSource
+ *     .builder()
+ *     .hostname("localhost")
+ *     .port(3306)
+ *     .databaseList("mydb")
+ *     .tableList("mydb.users")
+ *     .username(username)
+ *     .password(password)
+ *     .serverId(5400)
+ *     .deserializer(new JsonDebeziumDeserializationSchema())
+ *     .build();
+ * }
+ * + * Copied from com.ververica:flink-connector-mysql-cdc-2.3.0, added with Inlong metrics support + * + */ +@PublicEvolving +public class MySqlSourceBuilder { + + private final MySqlSourceConfigFactory configFactory = new MySqlSourceConfigFactory(); + private DebeziumDeserializationSchema deserializer; + + public MySqlSourceBuilder hostname(String hostname) { + this.configFactory.hostname(hostname); + return this; + } + + /** Integer port number of the MySQL database server. */ + public MySqlSourceBuilder port(int port) { + this.configFactory.port(port); + return this; + } + + /** + * An required list of regular expressions that match database names to be monitored; any + * database name not included in the whitelist will be excluded from monitoring. + */ + public MySqlSourceBuilder databaseList(String... databaseList) { + this.configFactory.databaseList(databaseList); + return this; + } + + /** + * An required list of regular expressions that match fully-qualified table identifiers for + * tables to be monitored; any table not included in the list will be excluded from monitoring. + * Each identifier is of the form {@code .}. + */ + public MySqlSourceBuilder tableList(String... tableList) { + this.configFactory.tableList(tableList); + return this; + } + + /** Name of the MySQL database to use when connecting to the MySQL database server. */ + public MySqlSourceBuilder username(String username) { + this.configFactory.username(username); + return this; + } + + /** Password to use when connecting to the MySQL database server. */ + public MySqlSourceBuilder password(String password) { + this.configFactory.password(password); + return this; + } + + /** + * A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like + * '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is + * required when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all + * currently-running database processes in the MySQL cluster. This connector joins the MySQL + * cluster as another server (with this unique ID) so it can read the binlog. By default, a + * random number is generated between 5400 and 6400, though we recommend setting an explicit + * value." + */ + public MySqlSourceBuilder serverId(String serverId) { + this.configFactory.serverId(serverId); + return this; + } + + /** + * The session time zone in database server, e.g. "America/Los_Angeles". It controls how the + * TIMESTAMP type in MYSQL converted to STRING. See more + * https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types + */ + public MySqlSourceBuilder serverTimeZone(String timeZone) { + this.configFactory.serverTimeZone(timeZone); + return this; + } + + /** + * The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk + * key column when read the snapshot of table. + */ + public MySqlSourceBuilder chunkKeyColumn(String chunkKeyColumn) { + this.configFactory.chunkKeyColumn(chunkKeyColumn); + return this; + } + + /** + * The split size (number of rows) of table snapshot, captured tables are split into multiple + * splits when read the snapshot of table. + */ + public MySqlSourceBuilder splitSize(int splitSize) { + this.configFactory.splitSize(splitSize); + return this; + } + + /** + * The group size of split meta, if the meta size exceeds the group size, the meta will be + * divided into multiple groups. + */ + public MySqlSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) { + this.configFactory.splitMetaGroupSize(splitMetaGroupSize); + return this; + } + + /** + * The upper bound of split key evenly distribution factor, the factor is used to determine + * whether the table is evenly distribution or not. + */ + public MySqlSourceBuilder distributionFactorUpper(double distributionFactorUpper) { + this.configFactory.distributionFactorUpper(distributionFactorUpper); + return this; + } + + /** + * The lower bound of split key evenly distribution factor, the factor is used to determine + * whether the table is evenly distribution or not. + */ + public MySqlSourceBuilder distributionFactorLower(double distributionFactorLower) { + this.configFactory.distributionFactorLower(distributionFactorLower); + return this; + } + + /** The maximum fetch size for per poll when read table snapshot. */ + public MySqlSourceBuilder fetchSize(int fetchSize) { + this.configFactory.fetchSize(fetchSize); + return this; + } + + /** + * The maximum time that the connector should wait after trying to connect to the MySQL database + * server before timing out. + */ + public MySqlSourceBuilder connectTimeout(Duration connectTimeout) { + this.configFactory.connectTimeout(connectTimeout); + return this; + } + + /** The max retry times to get connection. */ + public MySqlSourceBuilder connectMaxRetries(int connectMaxRetries) { + this.configFactory.connectMaxRetries(connectMaxRetries); + return this; + } + + /** The connection pool size. */ + public MySqlSourceBuilder connectionPoolSize(int connectionPoolSize) { + this.configFactory.connectionPoolSize(connectionPoolSize); + return this; + } + + /** Whether the MySqlSource should output the schema changes or not. */ + public MySqlSourceBuilder includeSchemaChanges(boolean includeSchemaChanges) { + this.configFactory.includeSchemaChanges(includeSchemaChanges); + return this; + } + + /** Whether the MySqlSource should scan the newly added tables or not. */ + public MySqlSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); + return this; + } + + /** Specifies the startup options. */ + public MySqlSourceBuilder startupOptions(StartupOptions startupOptions) { + this.configFactory.startupOptions(startupOptions); + return this; + } + + /** Custom properties that will overwrite the default JDBC connection URL. */ + public MySqlSourceBuilder jdbcProperties(Properties jdbcProperties) { + this.configFactory.jdbcProperties(jdbcProperties); + return this; + } + + /** The Debezium MySQL connector properties. For example, "snapshot.mode". */ + public MySqlSourceBuilder debeziumProperties(Properties properties) { + this.configFactory.debeziumProperties(properties); + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public MySqlSourceBuilder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + /** The interval of heartbeat event. */ + public MySqlSourceBuilder heartbeatInterval(Duration heartbeatInterval) { + this.configFactory.heartbeatInterval(heartbeatInterval); + return this; + } + + /** + * Build the MySqlSource + * + * @return a MySqlParallelSource with the settings made for this builder. + */ + public MySqlSource build() { + return new MySqlSource<>(configFactory, checkNotNull(deserializer)); + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 7be72aace2a..5a3bf7423ad 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -877,6 +877,8 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE 1.3.27 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE