From 6240cbaddd85aa1084c5ba5a0325fb52ed06abf6 Mon Sep 17 00:00:00 2001 From: He Wang Date: Mon, 12 Aug 2024 16:40:30 +0800 Subject: [PATCH] simplify lifecycleListener with AbstractLifecycleListener --- .../mysql/debezium/DebeziumUtils.java | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 4ce84fa4e6e..69819c74001 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -247,11 +247,11 @@ public static BinlogOffset findBinlogOffset( if (binlogFiles.isEmpty()) { return BinlogOffset.ofBinlogFilePosition("", 0); } + if (binlogFiles.size() == 1) { + return BinlogOffset.ofBinlogFilePosition(binlogFiles.get(0), 0); + } - MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); - BinaryLogClient client = - new BinaryLogClient( - config.hostname(), config.port(), config.username(), config.password()); + BinaryLogClient client = createBinaryClient(mySqlSourceConfig.getDbzConfiguration()); if (mySqlSourceConfig.getServerIdRange() != null) { client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); } @@ -266,8 +266,7 @@ public static BinlogOffset findBinlogOffset( } private static String searchBinlogName( - BinaryLogClient client, long targetMs, List binlogFiles) - throws IOException, InterruptedException { + BinaryLogClient client, long targetMs, List binlogFiles) throws IOException { int startIdx = 0; int endIdx = binlogFiles.size() - 1; @@ -288,11 +287,13 @@ private static String searchBinlogName( } } - return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx); + return binlogFiles.isEmpty() + ? "" + : endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx); } private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile) - throws IOException, InterruptedException { + throws IOException { ArrayBlockingQueue binlogTimestamps = new ArrayBlockingQueue<>(1); BinaryLogClient.EventListener eventListener = @@ -318,24 +319,11 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile ArrayBlockingQueue exceptions = new ArrayBlockingQueue<>(1); BinaryLogClient.LifecycleListener lifecycleListener = - new BinaryLogClient.LifecycleListener() { - - @Override - public void onConnect(BinaryLogClient client) {} - + new BinaryLogClient.AbstractLifecycleListener() { @Override public void onCommunicationFailure(BinaryLogClient client, Exception e) { - LOG.error("BinaryLogClient onCommunicationFailure", e); exceptions.add(e); } - - @Override - public void onEventDeserializationFailure(BinaryLogClient client, Exception e) { - LOG.warn("BinaryLogClient onEventDeserializationFailure", e); - } - - @Override - public void onDisconnect(BinaryLogClient client) {} }; client.registerEventListener(eventListener); @@ -347,6 +335,7 @@ public void onDisconnect(BinaryLogClient client) {} try { client.connect(); } finally { + client.disconnect(); client.unregisterLifecycleListener(lifecycleListener); client.unregisterEventListener(eventListener); }