Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36024] Handle BinaryLogClient exceptions when searching for starting binlog offset #3524

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -242,33 +241,23 @@ private static Map<String, String> querySystemVariables(

public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List<String> binlogFiles = connection.availableBinlogFiles();
LOG.info("Available binlog files: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
if (binlogFiles.size() == 1) {
return BinlogOffset.ofBinlogFilePosition(binlogFiles.get(0), 0);
}

BinaryLogClient client = createBinaryClient(mySqlSourceConfig.getDbzConfiguration());
if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};

LOG.info("Start querying binlog files for timestamp {}", targetMs);
try {
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);

if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}

String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
Expand All @@ -277,14 +266,18 @@ public static BinlogOffset findBinlogOffset(
}

private static String searchBinlogName(
BinaryLogClient client, long targetMs, List<String> binlogFiles)
throws IOException, InterruptedException {
BinaryLogClient client, long targetMs, List<String> binlogFiles) throws IOException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;

while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < 0) {
binlogFiles.remove(mid);
endIdx--;
continue;
}
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
Expand All @@ -294,11 +287,13 @@ private static String searchBinlogName(
}
}

return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
return binlogFiles.isEmpty()
? ""
: endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}

private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile)
throws IOException, InterruptedException {
throws IOException {

ArrayBlockingQueue<Long> binlogTimestamps = new ArrayBlockingQueue<>(1);
BinaryLogClient.EventListener eventListener =
Expand All @@ -322,16 +317,34 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile
}
};

try {
client.registerEventListener(eventListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);
ArrayBlockingQueue<Exception> exceptions = new ArrayBlockingQueue<>(1);
BinaryLogClient.LifecycleListener lifecycleListener =
new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onCommunicationFailure(BinaryLogClient client, Exception e) {
exceptions.add(e);
}
};

LOG.info("begin parse binlog: {}", binlogFile);
client.registerEventListener(eventListener);
client.registerLifecycleListener(lifecycleListener);
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);

LOG.info("Start parsing binlog: {}", binlogFile);
try {
client.connect();
} finally {
client.disconnect();
client.unregisterLifecycleListener(lifecycleListener);
client.unregisterEventListener(eventListener);
}
return binlogTimestamps.take();

Exception exception = exceptions.peek();
if (exception != null) {
throw new RuntimeException(exception);
}
Long ts = binlogTimestamps.peek();
return ts == null ? -1L : ts;
}
}
Loading