diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java index e956b111709..e99e7dab4b1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java @@ -112,6 +112,19 @@ private List splitTableIntoChunks( final int chunkSize = sourceConfig.getSplitSize(); final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); + + log.info( + "Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", + tableId, + splitColumnName, + min, + max, + chunkSize, + distributionFactorUpper, + distributionFactorLower, + sampleShardingThreshold); if (isEvenlySplitColumn(splitColumn)) { long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); @@ -130,7 +143,7 @@ private List splitTableIntoChunks( } else { int shardCount = (int) (approximateRowCnt / chunkSize); int inverseSamplingRate = sourceConfig.getInverseSamplingRate(); - if (sourceConfig.getSampleShardingThreshold() < shardCount) { + if (sampleShardingThreshold < shardCount) { // It is necessary to ensure that the number of data rows sampled by the // sampling rate is greater than the number of shards. // Otherwise, if the sampling rate is too low, it may result in an insufficient @@ -144,9 +157,17 @@ private List splitTableIntoChunks( chunkSize); inverseSamplingRate = chunkSize; } + log.info( + "Use sampling sharding for table {}, the sampling rate is {}", + tableId, + inverseSamplingRate); Object[] sample = sampleDataFromColumn( jdbc, tableId, splitColumnName, inverseSamplingRate); + log.info( + "Sample data from table {} end, the sample size is {}", + tableId, + sample.length); return efficientShardingThroughSampling( tableId, sample, approximateRowCnt, shardCount); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java index 396fd7bae9d..a84eb79be3e 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java @@ -68,7 +68,8 @@ public OptionRule optionRule() { JdbcSourceOptions.CONNECTION_POOL_SIZE, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND, JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND, - JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD) + JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD, + JdbcSourceOptions.INVERSE_SAMPLING_RATE) .optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE) .conditional( MySqlSourceOptions.STARTUP_MODE, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index 0249889b239..c078f7cf28c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -28,10 +28,12 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.TableId; +import lombok.extern.slf4j.Slf4j; import java.sql.SQLException; /** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */ +@Slf4j public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter { public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) { @@ -55,7 +57,7 @@ public Object queryMin( public Object[] sampleDataFromColumn( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException { - return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index c9223c81ff2..fb00020644f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -36,11 +36,13 @@ import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +54,7 @@ import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray; /** Utils to prepare MySQL SQL statement. */ +@Slf4j public class MySqlUtils { private MySqlUtils() {} @@ -142,6 +145,56 @@ public static Object[] sampleDataFromColumn( }); } + public static Object[] skipReadAndSortSampleData( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String sampleQuery = + String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); + + Statement stmt = null; + ResultSet rs = null; + + List results = new ArrayList<>(); + try { + stmt = + jdbc.connection() + .createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(Integer.MIN_VALUE); + rs = stmt.executeQuery(sampleQuery); + + int count = 0; + while (rs.next()) { + count++; + if (count % 100000 == 0) { + log.info("Processing row index: {}", count); + } + if (count % inverseSamplingRate == 0) { + results.add(rs.getObject(1)); + } + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.error("Failed to close ResultSet", e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + log.error("Failed to close Statement", e); + } + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index 7efd53dc3fc..1dc97020be5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -57,7 +57,8 @@ public Object queryMin( public Object[] sampleDataFromColumn( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException { - return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + return SqlServerUtils.skipReadAndSortSampleData( + jdbc, tableId, columnName, inverseSamplingRate); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java index a1271849843..d6e58825dab 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java @@ -39,10 +39,13 @@ import io.debezium.relational.TableId; import io.debezium.schema.TopicSelector; import io.debezium.util.SchemaNameAdjuster; +import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -52,6 +55,7 @@ import java.util.Optional; /** The utils for SqlServer data source. */ +@Slf4j public class SqlServerUtils { public SqlServerUtils() {} @@ -145,6 +149,56 @@ public static Object[] sampleDataFromColumn( }); } + public static Object[] skipReadAndSortSampleData( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String sampleQuery = + String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); + + Statement stmt = null; + ResultSet rs = null; + + List results = new ArrayList<>(); + try { + stmt = + jdbc.connection() + .createStatement( + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + stmt.setFetchSize(Integer.MIN_VALUE); + rs = stmt.executeQuery(sampleQuery); + + int count = 0; + while (rs.next()) { + count++; + if (count % 100000 == 0) { + log.info("Processing row index: {}", count); + } + if (count % inverseSamplingRate == 0) { + results.add(rs.getObject(1)); + } + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.error("Failed to close ResultSet", e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + log.error("Failed to close Statement", e); + } + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + /** * Returns the next LSN to be read from the database. This is the LSN of the last record that * was read from the database.