Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Imporve] [CDC Base] Add a fast sampling method that supports character types #5179

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ private List<ChunkRange> splitTableIntoChunks(
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();

log.info(
"Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
+ "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
tableId,
splitColumnName,
min,
max,
chunkSize,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold);

if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
Expand All @@ -130,7 +143,7 @@ private List<ChunkRange> splitTableIntoChunks(
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
if (sourceConfig.getSampleShardingThreshold() < shardCount) {
if (sampleShardingThreshold < shardCount) {
// It is necessary to ensure that the number of data rows sampled by the
// sampling rate is greater than the number of shards.
// Otherwise, if the sampling rate is too low, it may result in an insufficient
Expand All @@ -144,9 +157,17 @@ private List<ChunkRange> splitTableIntoChunks(
chunkSize);
inverseSamplingRate = chunkSize;
}
log.info(
"Use sampling sharding for table {}, the sampling rate is {}",
tableId,
inverseSamplingRate);
Object[] sample =
sampleDataFromColumn(
jdbc, tableId, splitColumnName, inverseSamplingRate);
log.info(
"Sample data from table {} end, the sample size is {}",
tableId,
sample.length);
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public OptionRule optionRule() {
JdbcSourceOptions.CONNECTION_POOL_SIZE,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
JdbcSourceOptions.INVERSE_SAMPLING_RATE)
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;

/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
@Slf4j
public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {

public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
Expand All @@ -55,7 +57,7 @@ public Object queryMin(
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -52,6 +54,7 @@
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;

/** Utils to prepare MySQL SQL statement. */
@Slf4j
public class MySqlUtils {

private MySqlUtils() {}
Expand Down Expand Up @@ -142,6 +145,56 @@ public static Object[] sampleDataFromColumn(
});
}

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Statement stmt = null;
ResultSet rs = null;

List<Object> results = new ArrayList<>();
try {
stmt =
jdbc.connection()
.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery(sampleQuery);

int count = 0;
while (rs.next()) {
count++;
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.error("Failed to close ResultSet", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error("Failed to close Statement", e);
}
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
return resultsArray;
}

public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public Object queryMin(
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
return SqlServerUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -52,6 +55,7 @@
import java.util.Optional;

/** The utils for SqlServer data source. */
@Slf4j
public class SqlServerUtils {

public SqlServerUtils() {}
Expand Down Expand Up @@ -145,6 +149,56 @@ public static Object[] sampleDataFromColumn(
});
}

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Statement stmt = null;
ResultSet rs = null;

List<Object> results = new ArrayList<>();
try {
stmt =
jdbc.connection()
.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery(sampleQuery);

int count = 0;
while (rs.next()) {
count++;
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.error("Failed to close ResultSet", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error("Failed to close Statement", e);
}
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
return resultsArray;
}

/**
* Returns the next LSN to be read from the database. This is the LSN of the last record that
* was read from the database.
Expand Down
Loading