Skip to content

Commit

Permalink
[INLONG-10297][Sort] Fix mysql connector cannot submit to flink clust…
Browse files Browse the repository at this point in the history
…er (#10301)
  • Loading branch information
EMsnap committed May 29, 2024
1 parent d43e1e8 commit 4555ebf
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.inlong.sort.mysql;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.mysql.source.MySqlSource;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
Expand Down Expand Up @@ -182,7 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.setServerTimeZone(serverTimeZone)
.setUserDefinedConverterFactory(
MySqlDeserializationConverterFactory.instance())
.setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption))
.setMetricOption(metricOption)
.build();
if (enableParallelRead) {
MySqlSource<RowData> parallelSource =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.mysql;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;

Expand Down Expand Up @@ -104,7 +105,8 @@ public interface ValueValidator extends Serializable {

/** Changelog Mode to use for encoding changes in Flink internal data structure. */
private final DebeziumChangelogMode changelogMode;
private final SourceMetricData sourceMetricData;
private SourceMetricData sourceMetricData;
private final MetricOption metricOption;

/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
public static Builder newBuilder() {
Expand All @@ -119,7 +121,7 @@ public static Builder newBuilder() {
ZoneId serverTimeZone,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
DebeziumChangelogMode changelogMode,
SourceMetricData sourceMetricData) {
MetricOption metricOption) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
this.physicalConverter =
Expand All @@ -130,7 +132,7 @@ public static Builder newBuilder() {
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.validator = checkNotNull(validator);
this.changelogMode = checkNotNull(changelogMode);
this.sourceMetricData = sourceMetricData;
this.metricOption = metricOption;
}

@Override
Expand All @@ -150,6 +152,9 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
if (sourceMetricData != null) {
out = new MetricsCollector<>(out, sourceMetricData);
}
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
Expand All @@ -169,6 +174,17 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
}
}

/**
* Initialize the source metric data.
* Must be called on flink cluster rather than flink client
* Since the audit operator is not serializable
*/
public void initSourceMetricData() {
if (metricOption != null) {
this.sourceMetricData = new SourceMetricData(metricOption);
}
}

private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Expand Down Expand Up @@ -212,7 +228,7 @@ public static class Builder {
private DeserializationRuntimeConverterFactory userDefinedConverterFactory =
DeserializationRuntimeConverterFactory.DEFAULT;
private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
private SourceMetricData sourceMetricData;
private MetricOption metricOption;

public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
Expand Down Expand Up @@ -249,8 +265,9 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
this.changelogMode = changelogMode;
return this;
}
public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
this.sourceMetricData = sourceMetricData;

public Builder setMetricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

Expand All @@ -263,7 +280,7 @@ public RowDataDebeziumDeserializeSchema build() {
serverTimeZone,
userDefinedConverterFactory,
changelogMode,
sourceMetricData);
metricOption);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.mysql.source;

import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema;

import com.ververica.cdc.connectors.mysql.MySqlValidator;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.jdbc.JdbcConnection;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.function.Supplier;

import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;

/**
* The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel
* reading snapshot of table and then continue to capture data change from binlog.
*
* <pre>
* 1. The source supports parallel capturing table change.
* 2. The source supports checkpoint in split level when read snapshot data.
* 3. The source doesn't need apply any lock of MySQL.
* </pre>
*
* <pre>{@code
* MySqlSource
* .<String>builder()
* .hostname("localhost")
* .port(3306)
* .databaseList("mydb")
* .tableList("mydb.users")
* .username(username)
* .password(password)
* .serverId(5400)
* .deserializer(new JsonDebeziumDeserializationSchema())
* .build();
* }</pre>
*
* @param <T> the output type of the source.
*
* Copied from com.ververica:flink-connector-mysql-cdc-2.3.0, added with Inlong metrics support
*
*/
@Internal
public class MySqlSource<T>
implements
Source<T, MySqlSplit, PendingSplitsState>,
ResultTypeQueryable<T> {

private static final long serialVersionUID = 1L;

private final MySqlSourceConfigFactory configFactory;
private final DebeziumDeserializationSchema<T> deserializationSchema;

/**
* Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}.
*
* @return a MySql parallel source builder.
*/
@PublicEvolving
public static <T> MySqlSourceBuilder<T> builder() {
return new MySqlSourceBuilder<>();
}

MySqlSource(
MySqlSourceConfigFactory configFactory,
DebeziumDeserializationSchema<T> deserializationSchema) {
this.configFactory = configFactory;
this.deserializationSchema = deserializationSchema;
}

public MySqlSourceConfigFactory getConfigFactory() {
return configFactory;
}

@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext)
throws Exception {
// create source config for the given subtask (e.g. unique server id)
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();

final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup");
metricGroupMethod.setAccessible(true);
final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext);

final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(metricGroup);
sourceReaderMetrics.registerMetrics();
MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
Supplier<MySqlSplitReader> splitReaderSupplier =
() -> new MySqlSplitReader(
sourceConfig,
readerContext.getIndexOfSubtask(),
mySqlSourceReaderContext);

if (deserializationSchema instanceof RowDataDebeziumDeserializeSchema) {
((RowDataDebeziumDeserializeSchema) deserializationSchema).initSourceMetricData();
}

return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig);
}

@Override
public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<MySqlSplit> enumContext) {
MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

final MySqlValidator validator = new MySqlValidator(sourceConfig);
validator.validate();

final MySqlSplitAssigner splitAssigner;
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
splitAssigner =
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
}
} else {
splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
}

return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
}

@Override
public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
MySqlSourceConfig sourceConfig = configFactory.createConfig(0);

final MySqlSplitAssigner splitAssigner;
if (checkpoint instanceof HybridPendingSplitsState) {
splitAssigner =
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(
sourceConfig, (BinlogPendingSplitsState) checkpoint);
} else {
throw new UnsupportedOperationException(
"Unsupported restored PendingSplitsState: " + checkpoint);
}
return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
}

@Override
public SimpleVersionedSerializer<MySqlSplit> getSplitSerializer() {
return MySqlSplitSerializer.INSTANCE;
}

@Override
public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() {
return new PendingSplitsStateSerializer(getSplitSerializer());
}

@Override
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
}
Loading

0 comments on commit 4555ebf

Please sign in to comment.