Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql] Mysql-cdc adapt mariadb. #2494

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,7 @@
import java.util.Map;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;

/**
* Copied from Debezium project(1.9.8.final) to add custom jdbc properties in the jdbc url. The new
Expand Down Expand Up @@ -74,6 +76,8 @@ public class MySqlConnection extends JdbcConnection {

private final String urlPattern;

private final boolean isMariaDB;

/**
* Creates a new connection using the supplied configuration.
*
Expand All @@ -90,6 +94,7 @@ public MySqlConnection(
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
this.urlPattern = connectionConfig.getUrlPattern();
this.isMariaDB = queryIsMariadb();
}

/**
Expand Down Expand Up @@ -140,6 +145,28 @@ protected Map<String, String> readMySqlSystemVariables() {
return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES);
}

public boolean isMariaDB() {
return isMariaDB;
}

private boolean queryIsMariadb() {
AtomicReference<Boolean> mode = new AtomicReference<>(false);
try {
query(
"SELECT VERSION()",
rs -> {
if (rs.next()) {
String version = rs.getString(1);
mode.set(version.toLowerCase().contains("mariadb"));
}
});
} catch (SQLException e) {
throw new ConnectException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
}
return mode.get();
}

private Map<String, String> querySystemVariables(String statement) {
final Map<String, String> variables = new HashMap<>();
try {
Expand Down Expand Up @@ -252,14 +279,25 @@ protected String getSessionVariableForSslVersion() {
*/
public boolean isGtidModeEnabled() {
try {
return queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return !"OFF".equalsIgnoreCase(rs.getString(2));
}
return false;
});
if (isMariaDB) {
return queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_CURRENT_POS'",
rs -> {
if (rs.next()) {
return !"".equalsIgnoreCase(rs.getString(2));
}
return false;
});
} else {
return queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'",
rs -> {
if (rs.next()) {
return !"OFF".equalsIgnoreCase(rs.getString(2));
}
return false;
});
}
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
Expand All @@ -274,15 +312,27 @@ public boolean isGtidModeEnabled() {
*/
public String knownGtidSet() {
try {
return queryAndMap(
"SHOW MASTER STATUS",
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
5); // GTID set, may be null, blank, or contain a GTID set
}
return "";
});
if (isMariaDB) {
return queryAndMap(
"SHOW GLOBAL VARIABLES LIKE 'GTID_CURRENT_POS';",
rs -> {
if (rs.next()) {
return rs.getString(
2); // GTID set, may be null, blank, or contain a GTID set
}
return "";
});
} else {
return queryAndMap(
"SHOW MASTER STATUS",
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() > 4) {
return rs.getString(
5); // GTID set, may be null, blank, or contain a GTID set
}
return "";
});
}
} catch (SQLException e) {
throw new DebeziumException(
"Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.MariadbGtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
Expand All @@ -24,6 +25,7 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.MariadbGtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
Expand Down Expand Up @@ -311,6 +313,8 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(
EventType.MARIADB_GTID, new MariadbGtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(
EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
Expand Down Expand Up @@ -570,6 +574,15 @@ protected void handleGtidEvent(MySqlOffsetContext offsetContext, Event event) {
metrics.onGtidChange(gtid);
}

protected void handleMariaGtidEvent(MySqlOffsetContext offsetContext, Event event) {
LOGGER.debug("GTID transaction: {}", event);
MariadbGtidEventData gtidEvent = unwrapData(event);
String gtid = gtidEvent.toString();
gtidSet.add(gtid);
offsetContext.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
metrics.onGtidChange(gtid);
}

/**
* Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL
* query that generated the event.
Expand Down Expand Up @@ -1117,33 +1130,51 @@ public void execute(
// the last Debezium checkpoint.
String availableServerGtidStr = connection.knownGtidSet();
if (isGtidModeEnabled) {
// The server is using GTIDs, so enable the handler ...
eventHandlers.put(
EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event));

// Now look at the GTID set from the server and what we've previously seen ...
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

// also take into account purged GTID logs
GtidSet purgedServerGtidSet = connection.purgedGtidSet();
LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);

GtidSet filteredGtidSet =
filterGtidSet(
effectiveOffsetContext, availableServerGtidSet, purgedServerGtidSet);
if (filteredGtidSet != null) {
// We've seen at least some GTIDs, so start reading from the filtered GTID set ...
LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
if (connection.isMariaDB()) {
eventHandlers.put(
EventType.MARIADB_GTID,
(event) -> handleMariaGtidEvent(effectiveOffsetContext, event));
if (offsetContext.gtidSet() != null) {
// MariaDB can not detect purged GTID logs.
client.setGtidSet(offsetContext.gtidSet());
offsetContext.setCompletedGtidSet(offsetContext.gtidSet());
gtidSet =
new com.github.shyiko.mysql.binlog.MariadbGtidSet(
offsetContext.gtidSet());
}
} else {
// We've not yet seen any GTIDs, so that means we have to start reading the binlog
// from the beginning ...
client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
// The server is using GTIDs, so enable the handler ...
eventHandlers.put(
EventType.GTID, (event) -> handleGtidEvent(effectiveOffsetContext, event));

// Now look at the GTID set from the server and what we've previously seen ...
GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr);

// also take into account purged GTID logs
GtidSet purgedServerGtidSet = connection.purgedGtidSet();
LOGGER.info("GTID set purged on server: {}", purgedServerGtidSet);

GtidSet filteredGtidSet =
filterGtidSet(
effectiveOffsetContext,
availableServerGtidSet,
purgedServerGtidSet);
if (filteredGtidSet != null) {
// We've seen at least some GTIDs, so start reading from the filtered GTID set
// ...
LOGGER.info("Registering binlog reader with GTID set: {}", filteredGtidSet);
String filteredGtidSetStr = filteredGtidSet.toString();
client.setGtidSet(filteredGtidSetStr);
effectiveOffsetContext.setCompletedGtidSet(filteredGtidSetStr);
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr);
} else {
// We've not yet seen any GTIDs, so that means we have to start reading the
// binlog
// from the beginning ...
client.setBinlogFilename(effectiveOffsetContext.getSource().binlogFilename());
client.setBinlogPosition(effectiveOffsetContext.getSource().binlogPosition());
gtidSet = new com.github.shyiko.mysql.binlog.GtidSet("");
}
}
} else {
// The server is not using GTIDs, so start reading the binlog based upon where we last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.debezium;

import org.apache.flink.cdc.connectors.mysql.source.client.MariaDBBinaryLogClient;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionFactory;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand Down Expand Up @@ -102,6 +103,22 @@ public static BinaryLogClient createBinaryClient(Configuration dbzConfiguration)
connectorConfig.password());
}

public static BinaryLogClient createBinaryClient(
Configuration dbzConfiguration, boolean isMariaDB) {
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dbzConfiguration);
return isMariaDB
? new MariaDBBinaryLogClient(
connectorConfig.hostname(),
connectorConfig.port(),
connectorConfig.username(),
connectorConfig.password())
: new BinaryLogClient(
connectorConfig.hostname(),
connectorConfig.port(),
connectorConfig.username(),
connectorConfig.password());
}

/** Creates a new {@link MySqlDatabaseSchema} to monitor the latest MySql database schemas. */
public static MySqlDatabaseSchema createMySqlDatabaseSchema(
MySqlConnectorConfig dbzMySqlConfig, boolean isTableIdCaseSensitive) {
Expand All @@ -126,8 +143,16 @@ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
if (rs.next()) {
final String binlogFilename = rs.getString(1);
final long binlogPosition = rs.getLong(2);
final String gtidSet =
rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null;
final String gtidSet;
if (jdbc instanceof MySqlConnection
&& ((MySqlConnection) jdbc).isMariaDB()) {
gtidSet = ((MySqlConnection) jdbc).knownGtidSet();
} else {
gtidSet =
rs.getMetaData().getColumnCount() > 4
? rs.getString(5)
: null;
}
return BinlogOffset.builder()
.setBinlogFilePosition(binlogFilename, binlogPosition)
.setGtidSet(gtidSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.MariadbGtidSet;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.GtidSet;
Expand Down Expand Up @@ -212,7 +213,7 @@ protected MySqlOffsetContext loadStartingOffsetState(
private boolean isBinlogAvailable(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();
if (gtidStr != null) {
return checkGtidSet(offset);
return connection.isMariaDB() ? checkMariaDBGtidSet(offset) : checkGtidSet(offset);
}

return checkBinlogFilename(offset);
Expand Down Expand Up @@ -273,6 +274,29 @@ private boolean checkGtidSet(MySqlOffsetContext offset) {
return false;
}

private boolean checkMariaDBGtidSet(MySqlOffsetContext offset) {
String gtidStr = offset.gtidSet();

if (gtidStr.trim().isEmpty()) {
return true; // start at beginning ...
}

String availableGtidStr = connection.knownGtidSet();
if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
// Last offsets had GTIDs but the server does not use them ...
LOG.warn(
"Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
return false;
}
// GTIDs are enabled
MariadbGtidSet mariadbGtidSet = new MariadbGtidSet(gtidStr);
// Get the GTID set that is available in the server ...
MariadbGtidSet availableGtidSet = new MariadbGtidSet(availableGtidStr);
// GTIDs are enabled
// MariaDB cannot detect purge gtid.
return mariadbGtidSet.isContainedWithin(availableGtidSet);
}

private boolean checkBinlogFilename(MySqlOffsetContext offset) {
String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
if (binlogFilename == null) {
Expand Down
Loading
Loading