Skip to content

Commit

Permalink
simplify lifecycleListener with AbstractLifecycleListener
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Aug 12, 2024
1 parent 7665492 commit 6240cba
Showing 1 changed file with 11 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@ public static BinlogOffset findBinlogOffset(
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
if (binlogFiles.size() == 1) {
return BinlogOffset.ofBinlogFilePosition(binlogFiles.get(0), 0);
}

MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
BinaryLogClient client = createBinaryClient(mySqlSourceConfig.getDbzConfiguration());
if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
Expand All @@ -266,8 +266,7 @@ public static BinlogOffset findBinlogOffset(
}

private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
BinaryLogClient client, long targetMs, List<String> binlogFiles) throws IOException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;

Expand All @@ -288,11 +287,13 @@ private static String searchBinlogName(
}
}

return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
return binlogFiles.isEmpty()
? ""
: endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}

private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile)
throws IOException, InterruptedException {
throws IOException {

ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
Expand All @@ -318,24 +319,11 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile

ArrayBlockingQueue<Exception> exceptions = new ArrayBlockingQueue<>(1);
BinaryLogClient.LifecycleListener lifecycleListener =
new BinaryLogClient.LifecycleListener() {

@Override
public void onConnect(BinaryLogClient client) {}

new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onCommunicationFailure(BinaryLogClient client, Exception e) {
LOG.error("BinaryLogClient onCommunicationFailure", e);
exceptions.add(e);
}

@Override
public void onEventDeserializationFailure(BinaryLogClient client, Exception e) {
LOG.warn("BinaryLogClient onEventDeserializationFailure", e);
}

@Override
public void onDisconnect(BinaryLogClient client) {}
};

client.registerEventListener(eventListener);
Expand All @@ -347,6 +335,7 @@ public void onDisconnect(BinaryLogClient client) {}
try {
client.connect();
} finally {
client.disconnect();
client.unregisterLifecycleListener(lifecycleListener);
client.unregisterEventListener(eventListener);
}
Expand Down

0 comments on commit 6240cba

Please sign in to comment.