From 53a6b121fda5901ae0183f4ea87881c0c4e4206e Mon Sep 17 00:00:00 2001 From: thisiswilli Date: Thu, 14 Sep 2023 18:01:18 +0800 Subject: [PATCH] [mysql] Mysql-cdc adapt mariadb. --- .../flink-connector-mysql-cdc/pom.xml | 7 + .../connector/mysql/MySqlConnection.java | 84 +++++-- .../MySqlStreamingChangeEventSource.java | 83 ++++--- .../mysql/debezium/DebeziumUtils.java | 29 ++- .../task/context/StatefulTaskContext.java | 26 ++- .../source/client/MariaDBBinaryLogClient.java | 69 ++++++ .../mysql/source/offset/BinlogOffset.java | 13 +- .../mysql/source/reader/MySqlSplitReader.java | 7 +- .../mysql/table/MariaDBConnectorITCase.java | 221 ++++++++++++++++++ .../mysql/table/MariaDBTestBase.java | 116 +++++++++ .../mysql/testutils/MariaDBContainer.java | 71 ++++++ .../test/resources/ddl/mariadb_inventory.sql | 57 +++++ 12 files changed, 732 insertions(+), 51 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/client/MariaDBBinaryLogClient.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBConnectorITCase.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBTestBase.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MariaDBContainer.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/mariadb_inventory.sql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 8e6886e156..26e733cab4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -183,6 +183,13 @@ limitations under the License. test + + org.testcontainers + mariadb + ${testcontainers.version} + test + + com.jayway.jsonpath json-path diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 801e27d6b8..a1d8e50e3c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -23,6 +23,7 @@ import io.debezium.relational.history.DatabaseHistory; import io.debezium.schema.DatabaseSchema; import io.debezium.util.Strings; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; /** * Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new @@ -74,6 +76,8 @@ public class MySqlConnection extends JdbcConnection { private final String urlPattern; + private final boolean isMariaDB; + /** * Creates a new connection using the supplied configuration. * @@ -90,6 +94,7 @@ public MySqlConnection( this.connectionConfig = connectionConfig; this.mysqlFieldReader = fieldReader; this.urlPattern = connectionConfig.getUrlPattern(); + this.isMariaDB = queryIsMariadb(); } /** @@ -140,6 +145,28 @@ protected Map readMySqlSystemVariables() { return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES); } + public boolean isMariaDB() { + return isMariaDB; + } + + private boolean queryIsMariadb() { + AtomicReference mode = new AtomicReference<>(false); + try { + query( + "SELECT VERSION()", + rs -> { + if (rs.next()) { + String version = rs.getString(1); + mode.set(version.toLowerCase().contains("mariadb")); + } + }); + } catch (SQLException e) { + throw new ConnectException( + "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); + } + return mode.get(); + } + private Map querySystemVariables(String statement) { final Map variables = new HashMap<>(); try { @@ -252,14 +279,25 @@ protected String getSessionVariableForSslVersion() { */ public boolean isGtidModeEnabled() { try { - return queryAndMap( - "SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", - rs -> { - if (rs.next()) { - return !"OFF".equalsIgnoreCase(rs.getString(2)); - } - return false; - }); + if (isMariaDB) { + return queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'GTID_CURRENT_POS'", + rs -> { + if (rs.next()) { + return !"".equalsIgnoreCase(rs.getString(2)); + } + return false; + }); + } else { + return queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", + rs -> { + if (rs.next()) { + return !"OFF".equalsIgnoreCase(rs.getString(2)); + } + return false; + }); + } } catch (SQLException e) { throw new DebeziumException( "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); @@ -274,15 +312,27 @@ public boolean isGtidModeEnabled() { */ public String knownGtidSet() { try { - return queryAndMap( - "SHOW MASTER STATUS", - rs -> { - if (rs.next() && rs.getMetaData().getColumnCount() > 4) { - return rs.getString( - 5); // GTID set, may be null, blank, or contain a GTID set - } - return ""; - }); + if (isMariaDB) { + return queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'GTID_CURRENT_POS';", + rs -> { + if (rs.next()) { + return rs.getString( + 2); // GTID set, may be null, blank, or contain a GTID set + } + return ""; + }); + } else { + return queryAndMap( + "SHOW MASTER STATUS", + rs -> { + if (rs.next() && rs.getMetaData().getColumnCount() > 4) { + return rs.getString( + 5); // GTID set, may be null, blank, or contain a GTID set + } + return ""; + }); + } } catch (SQLException e) { throw new DebeziumException( "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 4f1c708994..a11fe736c1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -15,6 +15,7 @@ import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.GtidEventData; +import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData; import com.github.shyiko.mysql.binlog.event.QueryEventData; import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.RowsQueryEventData; @@ -24,6 +25,7 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.network.AuthenticationException; import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory; @@ -311,6 +313,8 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { // Add our custom deserializers ... eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer()); eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer()); + eventDeserializer.setEventDataDeserializer( + EventType.MARIADB_GTID, new MariadbGtidEventDataDeserializer()); eventDeserializer.setEventDataDeserializer( EventType.WRITE_ROWS, new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)); @@ -570,6 +574,15 @@ protected void handleGtidEvent(MySqlOffsetContext offsetContext, Event event) { metrics.onGtidChange(gtid); } + protected void handleMariaGtidEvent(MySqlOffsetContext offsetContext, Event event) { + LOGGER.debug("GTID transaction: {}", event); + MariadbGtidEventData gtidEvent = unwrapData(event); + String gtid = gtidEvent.toString(); + gtidSet.add(gtid); + offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set + metrics.onGtidChange(gtid); + } + /** * Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL * query that generated the event. @@ -1117,33 +1130,51 @@ public void execute( // the last Debezium checkpoint. String availableServerGtidStr = connection.knownGtidSet(); if (isGtidModeEnabled) { - // The server is using GTIDs, so enable the handler ... - eventHandlers.put( - EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event)); - - // Now look at the GTID set from the server and what we've previously seen ... - GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); - - // also take into account purged GTID logs - GtidSet purgedServerGtidSet = connection.purgedGtidSet(); - LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet); - - GtidSet filteredGtidSet = - filterGtidSet( - effectiveOffsetContext, availableServerGtidSet, purgedServerGtidSet); - if (filteredGtidSet != null) { - // We've seen at least some GTIDs, so start reading from the filtered GTID set ... - LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet); - String filteredGtidSetStr = filteredGtidSet.toString(); - client.setGtidSet(filteredGtidSetStr); - effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr); - gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr); + if (connection.isMariaDB()) { + eventHandlers.put( + EventType.MARIADB_GTID, + (event) -> handleMariaGtidEvent(effectiveOffsetContext, event)); + if (offsetContext.gtidSet() != null) { + // MariaDB can not detect purged GTID logs. + client.setGtidSet(offsetContext.gtidSet()); + offsetContext.setCompletedGtidSet(offsetContext.gtidSet()); + gtidSet = + new com.github.shyiko.mysql.binlog.MariadbGtidSet( + offsetContext.gtidSet()); + } } else { - // We've not yet seen any GTIDs, so that means we have to start reading the binlog - // from the beginning ... - client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename()); - client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition()); - gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(""); + // The server is using GTIDs, so enable the handler ... + eventHandlers.put( + EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event)); + + // Now look at the GTID set from the server and what we've previously seen ... + GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); + + // also take into account purged GTID logs + GtidSet purgedServerGtidSet = connection.purgedGtidSet(); + LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet); + + GtidSet filteredGtidSet = + filterGtidSet( + effectiveOffsetContext, + availableServerGtidSet, + purgedServerGtidSet); + if (filteredGtidSet != null) { + // We've seen at least some GTIDs, so start reading from the filtered GTID set + // ... + LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet); + String filteredGtidSetStr = filteredGtidSet.toString(); + client.setGtidSet(filteredGtidSetStr); + effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr); + gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr); + } else { + // We've not yet seen any GTIDs, so that means we have to start reading the + // binlog + // from the beginning ... + client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename()); + client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition()); + gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(""); + } } } else { // The server is not using GTIDs, so start reading the binlog based upon where we last diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 33dac67715..4e9eadd9ef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.debezium; +import org.apache.flink.cdc.connectors.mysql.source.client.MariaDBBinaryLogClient; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionFactory; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -102,6 +103,22 @@ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration) connectorConfig.password()); } + public static BinaryLogClient createBinaryClient( + Configuration dbzConfiguration, boolean isMariaDB) { + final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration); + return isMariaDB + ? new MariaDBBinaryLogClient( + connectorConfig.hostname(), + connectorConfig.port(), + connectorConfig.username(), + connectorConfig.password()) + : new BinaryLogClient( + connectorConfig.hostname(), + connectorConfig.port(), + connectorConfig.username(), + connectorConfig.password()); + } + /** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */ public static MySqlDatabaseSchema createMySqlDatabaseSchema( MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) { @@ -126,8 +143,16 @@ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) { if (rs.next()) { final String binlogFilename = rs.getString(1); final long binlogPosition = rs.getLong(2); - final String gtidSet = - rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null; + final String gtidSet; + if (jdbc instanceof MySqlConnection + && ((MySqlConnection) jdbc).isMariaDB()) { + gtidSet = ((MySqlConnection) jdbc).knownGtidSet(); + } else { + gtidSet = + rs.getMetaData().getColumnCount() > 4 + ? rs.getString(5) + : null; + } return BinlogOffset.builder() .setBinlogFilePosition(binlogFilename, binlogPosition) .setGtidSet(gtidSet) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 5fc342a4c2..c634cd1a55 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.MariadbGtidSet; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.mysql.GtidSet; @@ -212,7 +213,7 @@ protected MySqlOffsetContext loadStartingOffsetState( private boolean isBinlogAvailable(MySqlOffsetContext offset) { String gtidStr = offset.gtidSet(); if (gtidStr != null) { - return checkGtidSet(offset); + return connection.isMariaDB() ? checkMariaDBGtidSet(offset) : checkGtidSet(offset); } return checkBinlogFilename(offset); @@ -273,6 +274,29 @@ private boolean checkGtidSet(MySqlOffsetContext offset) { return false; } + private boolean checkMariaDBGtidSet(MySqlOffsetContext offset) { + String gtidStr = offset.gtidSet(); + + if (gtidStr.trim().isEmpty()) { + return true; // start at beginning ... + } + + String availableGtidStr = connection.knownGtidSet(); + if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) { + // Last offsets had GTIDs but the server does not use them ... + LOG.warn( + "Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled"); + return false; + } + // GTIDs are enabled + MariadbGtidSet mariadbGtidSet = new MariadbGtidSet(gtidStr); + // Get the GTID set that is available in the server ... + MariadbGtidSet availableGtidSet = new MariadbGtidSet(availableGtidStr); + // GTIDs are enabled + // MariaDB cannot detect purge gtid. + return mariadbGtidSet.isContainedWithin(availableGtidSet); + } + private boolean checkBinlogFilename(MySqlOffsetContext offset) { String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY); if (binlogFilename == null) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/client/MariaDBBinaryLogClient.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/client/MariaDBBinaryLogClient.java new file mode 100644 index 0000000000..0aaed4b769 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/client/MariaDBBinaryLogClient.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.client; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.network.protocol.command.Command; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; + +import java.io.IOException; +import java.util.logging.Logger; + +/** MariaDBBinaryLogClient. */ +public class MariaDBBinaryLogClient extends BinaryLogClient { + + private final Logger logger = Logger.getLogger(getClass().getName()); + + public MariaDBBinaryLogClient(String hostname, int port, String username, String password) { + super(hostname, port, username, password); + } + + @Override + protected void requestBinaryLogStreamMaria(long serverId) throws IOException { + Command dumpBinaryLogCommand; + + /* + https://jira.mariadb.org/browse/MDEV-225 + if set @mariadb_slave_capability=1, binlogClient can not receive mariadbGtidSet. + */ + channel.write(new QueryCommand("SET @mariadb_slave_capability=4")); + checkError(channel.read()); + + synchronized (gtidSetAccessLock) { + if (null != gtidSet) { + logger.info(gtidSet.toString()); + channel.write( + new QueryCommand( + "SET @slave_connect_state = '" + gtidSet.toString() + "'")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_strict_mode = 0")); + checkError(channel.read()); + channel.write(new QueryCommand("SET @slave_gtid_ignore_duplicates = 0")); + checkError(channel.read()); + dumpBinaryLogCommand = + new DumpBinaryLogCommand(serverId, "", 0L, isUseSendAnnotateRowsEvent()); + } else { + dumpBinaryLogCommand = + new DumpBinaryLogCommand( + serverId, getBinlogFilename(), getBinlogPosition()); + } + } + channel.write(dumpBinaryLogCommand); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java index af7948ac92..406be96daa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -21,7 +21,8 @@ import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.annotation.VisibleForTesting; -import io.debezium.connector.mysql.GtidSet; +import com.github.shyiko.mysql.binlog.GtidSet; +import com.github.shyiko.mysql.binlog.MariadbGtidSet; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.errors.ConnectException; @@ -188,8 +189,14 @@ public int compareTo(BinlogOffset that) { // The target offset uses GTIDs, so we ideally compare using GTIDs ... if (StringUtils.isNotEmpty(gtidSetStr)) { // Both have GTIDs, so base the comparison entirely on the GTID sets. - GtidSet gtidSet = new GtidSet(gtidSetStr); - GtidSet targetGtidSet = new GtidSet(targetGtidSetStr); + com.github.shyiko.mysql.binlog.GtidSet gtidSet = + gtidSetStr.contains(":") + ? new com.github.shyiko.mysql.binlog.GtidSet(gtidSetStr) + : new MariadbGtidSet(gtidSetStr); + com.github.shyiko.mysql.binlog.GtidSet targetGtidSet = + targetGtidSetStr.contains(":") + ? new GtidSet(targetGtidSetStr) + : new MariadbGtidSet(targetGtidSetStr); if (gtidSet.equals(targetGtidSet)) { long restartSkipEvents = this.getRestartSkipEvents(); long targetRestartSkipEvents = that.getRestartSkipEvents(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java index 4a46b7dc37..8c1da08a9d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSplitReader.java @@ -48,6 +48,7 @@ import java.util.Iterator; import java.util.Set; +import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID; /** The {@link SplitReader} implementation for the {@link MySqlSource}. */ @@ -236,7 +237,8 @@ private SnapshotSplitReader getSnapshotSplitReader() { final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig); final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + createBinaryClient(sourceConfig.getDbzConfiguration()); + createBinaryClient(sourceConfig.getDbzConfiguration(), jdbcConnection.isMariaDB()); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); reusedSnapshotReader = @@ -250,7 +252,8 @@ private BinlogSplitReader getBinlogSplitReader() { final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig); final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + createBinaryClient(sourceConfig.getDbzConfiguration()); + createBinaryClient(sourceConfig.getDbzConfiguration(), jdbcConnection.isMariaDB()); final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); reusedBinlogReader = new BinlogSplitReader(statefulTaskContext, subtaskId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBConnectorITCase.java new file mode 100644 index 0000000000..b780851973 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBConnectorITCase.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/** Integration tests to check mysql-cdc works well with different MariaDB server version. */ +public class MariaDBConnectorITCase extends MariaDBTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(MariaDBConnectorITCase.class); + + protected static final int DEFAULT_PARALLELISM = 4; + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + protected final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + private final boolean incrementalSnapshot = true; + + public MariaDBConnectorITCase() {} + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + if (incrementalSnapshot) { + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200); + } else { + env.setParallelism(1); + } + } + + @Test + public void testConsumingAllEvents() throws Exception { + initializeTable("inventory"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + MARIADB_CONTAINER.getHost(), + MARIADB_CONTAINER.getMappedPort(3306), + // MARIADB_CONTAINER.getUsername(), + "root", + // MARIADB_CONTAINER.getPassword(), + "flinkpwd", + MARIADB_CONTAINER.getDatabaseName(), + "products", + incrementalSnapshot, + getServerId(), + getSplitSize()); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + waitForSinkSize("sink", 20); + + /* + *
+         * The final database table looks like this:
+         *
+         * > SELECT * FROM products;
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | id  | name               | description                                             | weight |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * | 101 | scooter            | Small 2-wheel scooter                                   |   3.14 |
+         * | 102 | car battery        | 12V car battery                                         |    8.1 |
+         * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |    0.8 |
+         * | 104 | hammer             | 12oz carpenter's hammer                                 |   0.75 |
+         * | 105 | hammer             | 14oz carpenter's hammer                                 |  0.875 |
+         * | 106 | hammer             | 18oz carpenter hammer                                   |      1 |
+         * | 107 | rocks              | box of assorted rocks                                   |    5.1 |
+         * | 108 | jacket             | water resistent black wind breaker                      |    0.1 |
+         * | 109 | spare tire         | 24 inch spare tire                                      |   22.2 |
+         * | 110 | jacket             | new water resistent white wind breaker                  |    0.5 |
+         * +-----+--------------------+---------------------------------------------------------+--------+
+         * 
+ */ + + String[] expected = + new String[] { + "+I[scooter, 3.140]", + "+I[car battery, 8.100]", + "+I[12-pack drill bits, 0.800]", + "+I[hammer, 2.625]", + "+I[rocks, 5.100]", + "+I[jacket, 0.600]", + "+I[spare tire, 22.200]" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertEqualsInAnyOrder(Arrays.asList(expected), actual); + result.getJobClient().get().cancel().get(); + } + + protected String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + if (incrementalSnapshot) { + return serverId + "-" + (serverId + env.getParallelism()); + } + return String.valueOf(serverId); + } + + protected String getServerId(int base) { + if (incrementalSnapshot) { + return base + "-" + (base + DEFAULT_PARALLELISM); + } + return String.valueOf(base); + } + + protected int getSplitSize() { + if (incrementalSnapshot) { + // test parallel read + return 4; + } + return 0; + } + + protected static void waitForSnapshotStarted(String sinkName) throws InterruptedException { + while (sinkSize(sinkName) == 0) { + Thread.sleep(100); + } + } + + protected static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBTestBase.java new file mode 100644 index 0000000000..59e555a3ff --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MariaDBTestBase.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.table; + +import org.apache.flink.cdc.connectors.mysql.testutils.MariaDBContainer; + +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.lifecycle.Startables; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** MariaDBTestBase class. */ +public class MariaDBTestBase { + + protected static final Logger LOG = LoggerFactory.getLogger(MariaDBTestBase.class); + + protected static final MariaDBContainer MARIADB_CONTAINER = createMariaDBContainer(); + + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MARIADB_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + public Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection( + MARIADB_CONTAINER.getJdbcUrl(), + MARIADB_CONTAINER.getUsername(), + MARIADB_CONTAINER.getPassword()); + } + + public static MariaDBContainer createMariaDBContainer() { + return MariaDBContainer.createMariaDBContainer(); + } + + protected void initializeTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = MariaDBTestBase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MariaDBContainer.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MariaDBContainer.java new file mode 100644 index 0000000000..259a0be643 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MariaDBContainer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.testutils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +/** Docker container for MariaDB. */ +public class MariaDBContainer extends org.testcontainers.containers.MariaDBContainer { + + protected static final Logger LOG = LoggerFactory.getLogger(MariaDBContainer.class); + + public MariaDBContainer(String dockerImageName) { + super(dockerImageName); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(getDatabaseName()); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:mysql://" + + getHost() + + ":" + + getMappedPort(3306) + + "/" + + databaseName + + additionalUrlParams; + } + + public static MariaDBContainer createMariaDBContainer() { + MariaDBContainer mariaDBContainer = + (MariaDBContainer) + new MariaDBContainer("mariadb:11.5-rc") + .withConfigurationOverride("docker/server") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpwd") + .withEnv("MYSQL_ROOT_PASSWORD", "123456") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return mariaDBContainer; + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/mariadb_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/mariadb_inventory.sql new file mode 100644 index 0000000000..974836ef91 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/mariadb_inventory.sql @@ -0,0 +1,57 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: mysql_inventory +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our products using a single insert with many rows +CREATE TABLE products_source ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c enum('red', 'white') default 'red', -- test some complex types as well, + json_c JSON, -- because we use additional dependencies to deserialize complex types. + point_c POINT +); +ALTER TABLE products_source AUTO_INCREMENT = 101; + +INSERT INTO products_source +VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": "value1"}', ST_GeomFromText('POINT(1 1)')), + (default,"car battery","12V car battery",8.1, 'white', '{"key2": "value2"}', ST_GeomFromText('POINT(2 2)')), + (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 3)')), + (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": "value4"}', ST_GeomFromText('POINT(4 4)')), + (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", "k2": "v2"}', ST_GeomFromText('POINT(5 5)')), + (default,"hammer","16oz carpenter's hammer",1.0, null, null, null), + (default,"rocks","box of assorted rocks",5.3, null, null, null), + (default,"jacket","water resistent black wind breaker",0.1, null, null, null), + (default,"spare tire","24 inch spare tire",22.2, null, null, null); + + +CREATE TABLE products_sink ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT, + enum_c VARCHAR(255), + json_c VARCHAR(255), + point_c VARCHAR(255) +); + +CREATE TABLE mongodb_products_sink ( + id VARCHAR (255) NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); \ No newline at end of file