Skip to content

Commit

Permalink
[FLINK-36024] Handle BinaryLogClient exceptions when searching for th…
Browse files Browse the repository at this point in the history
…e starting binlog offset
  • Loading branch information
whhe committed Aug 9, 2024
1 parent b937db2 commit 7665492
Showing 1 changed file with 47 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -242,33 +241,23 @@ private static Map<String, String> querySystemVariables(

public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
List<String> binlogFiles = connection.availableBinlogFiles();
LOG.info("Available binlog files: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}

MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};

LOG.info("Start querying binlog files for timestamp {}", targetMs);
try {
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}

String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
Expand All @@ -285,6 +274,11 @@ private static String searchBinlogName(
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < 0) {
binlogFiles.remove(mid);
endIdx--;
continue;
}
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
Expand Down Expand Up @@ -322,16 +316,44 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile
}
};

try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
ArrayBlockingQueue<Exception> exceptions = new ArrayBlockingQueue<>(1);
BinaryLogClient.LifecycleListener lifecycleListener =
new BinaryLogClient.LifecycleListener() {

LOG.info("begin parse binlog: {}", binlogFile);
@Override
public void onConnect(BinaryLogClient client) {}

@Override
public void onCommunicationFailure(BinaryLogClient client, Exception e) {
LOG.error("BinaryLogClient onCommunicationFailure", e);
exceptions.add(e);
}

@Override
public void onEventDeserializationFailure(BinaryLogClient client, Exception e) {
LOG.warn("BinaryLogClient onEventDeserializationFailure", e);
}

@Override
public void onDisconnect(BinaryLogClient client) {}
};

client.registerEventListener(eventListener);
client.registerLifecycleListener(lifecycleListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);

LOG.info("Start parsing binlog: {}", binlogFile);
try {
client.connect();
} finally {
client.unregisterLifecycleListener(lifecycleListener);
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();

if (!exceptions.isEmpty()) {
throw new RuntimeException(exceptions.peek());
}
return binlogTimestamps.isEmpty() ? -1L : binlogTimestamps.peek();
}
}

0 comments on commit 7665492

Please sign in to comment.