From edf9061df10b96417a0fc9217f322d452c3ad8c3 Mon Sep 17 00:00:00 2001 From: liuli Date: Tue, 30 May 2023 00:25:24 +0800 Subject: [PATCH 1/3] [improve][CDC base] Implement Sample-based Sharding Strategy --- .../cdc/base/config/BaseSourceConfig.java | 6 +++ .../cdc/base/config/JdbcSourceConfig.java | 4 ++ .../base/config/JdbcSourceConfigFactory.java | 39 +++++++++++++++- .../cdc/base/option/JdbcSourceOptions.java | 23 +++++++++- .../splitter/JdbcSourceChunkSplitter.java | 16 +++++++ .../cdc/mysql/config/MySqlSourceConfig.java | 4 ++ .../config/MySqlSourceConfigFactory.java | 2 + .../source/eumerator/MySqlChunkSplitter.java | 46 +++++++++++++++++++ .../seatunnel/cdc/mysql/utils/MySqlUtils.java | 25 ++++++++++ .../source/config/SqlServerSourceConfig.java | 4 ++ .../config/SqlServerSourceConfigFactory.java | 2 + .../eumerator/SqlServerChunkSplitter.java | 46 +++++++++++++++++++ .../source/utils/SqlServerUtils.java | 25 ++++++++++ 13 files changed, 239 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java index 7cab3375514..e2016f1fe0f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/BaseSourceConfig.java @@ -37,6 +37,8 @@ public abstract class BaseSourceConfig implements SourceConfig { @Getter protected final double distributionFactorUpper; @Getter protected final double distributionFactorLower; + @Getter protected final int sampleShardingThreshold; + @Getter protected final int inverseSamplingRate; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -49,12 +51,16 @@ public BaseSourceConfig( int splitSize, double distributionFactorUpper, double distributionFactorLower, + int sampleShardingThreshold, + int inverseSamplingRate, Properties dbzProperties) { this.startupConfig = startupConfig; this.stopConfig = stopConfig; this.splitSize = splitSize; this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; + this.sampleShardingThreshold = sampleShardingThreshold; + this.inverseSamplingRate = inverseSamplingRate; this.dbzProperties = dbzProperties; } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java index 0b670bbe721..aa9218c20e8 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java @@ -51,6 +51,8 @@ public JdbcSourceConfig( int splitSize, double distributionFactorUpper, double distributionFactorLower, + int sampleShardingThreshold, + int inverseSamplingRate, Properties dbzProperties, String driverClassName, String hostname, @@ -69,6 +71,8 @@ public JdbcSourceConfig( splitSize, distributionFactorUpper, distributionFactorLower, + sampleShardingThreshold, + inverseSamplingRate, dbzProperties); this.driverClassName = driverClassName; this.hostname = hostname; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java index 14debbbe653..c58e18489b1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java @@ -42,8 +42,13 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = Options.key("chunk-key.even-distribution.factor.upper-bound") .doubleType() - .defaultValue(1000.0d) + .defaultValue(100.0d) .withDescription( "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." @@ -118,4 +118,25 @@ public class JdbcSourceOptions extends SourceOptions { + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + public static final Option SAMPLE_SHARDING_THRESHOLD = + Options.key("sample-sharding.threshold") + .intType() + .defaultValue(1000) // 1000 shards + .withDescription( + "The threshold of estimated shard count to trigger the sample sharding strategy. " + + "When the distribution factor is outside the upper and lower bounds, " + + "and if the estimated shard count (approximateRowCnt/chunkSize) exceeds this threshold, " + + "the sample sharding strategy will be used. " + + "This strategy can help to handle large datasets more efficiently. " + + "The default value is 1000 shards."); + public static final Option INVERSE_SAMPLING_RATE = + Options.key("inverse-sampling.rate") + .intType() + .defaultValue(1000) // 1/1000 sampling rate + .withDescription( + "The inverse of the sampling rate for the sample sharding strategy. " + + "The value represents the denominator of the sampling rate fraction. " + + "For example, a value of 1000 means a sampling rate of 1/1000. " + + "This parameter is used when the sample sharding strategy is triggered."); } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java index d700b112ed4..9e42d552635 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java @@ -62,6 +62,22 @@ Object queryMin( JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException; + /** + * Performs a sampling operation on the specified column of a table in a JDBC-connected + * database. + * + * @param jdbc The JDBC connection object used to connect to the database. + * @param tableId The ID of the table in which the column resides. + * @param columnName The name of the column to be sampled. + * @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from + * the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled. + * @return Returns a List of sampled data from the specified column. + * @throws SQLException If an SQL error occurs during the sampling operation. + */ + Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate) + throws SQLException; + /** * Query the maximum value of the next chunk, and the next chunk must be greater than or equal * to includedLowerBound value [min_1, max_1), [min_2, max_2),... [min_n, null). diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java index 02df33bd3ee..9b18f202eca 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java @@ -43,6 +43,8 @@ public MySqlSourceConfig( int splitSize, double distributionFactorUpper, double distributionFactorLower, + int sampleShardingThreshold, + int inverseSamplingRate, Properties dbzProperties, String driverClassName, String hostname, @@ -63,6 +65,8 @@ public MySqlSourceConfig( splitSize, distributionFactorUpper, distributionFactorLower, + sampleShardingThreshold, + inverseSamplingRate, dbzProperties, driverClassName, hostname, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java index 9275e45c23f..02bb1343f70 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java @@ -117,6 +117,8 @@ public MySqlSourceConfig create(int subtaskId) { splitSize, distributionFactorUpper, distributionFactorLower, + sampleShardingThreshold, + inverseSamplingRate, props, driverClassName, hostname, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index c248edd69d7..60b02c27c7d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -118,6 +118,13 @@ public Object queryMin( return MySqlUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); } + @Override + public Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + } + @Override public Object queryNextChunkMax( JdbcConnection jdbc, @@ -188,6 +195,22 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { + int shardCount = (int) (approximateRowCnt / chunkSize); + if (sourceConfig.getSampleShardingThreshold() < shardCount) { + Object[] sample = + sampleDataFromColumn( + jdbc, + tableId, + splitColumnName, + sourceConfig.getInverseSamplingRate()); + // In order to prevent data loss due to the absence of the minimum value in the + // sampled data, the minimum value is directly added here. + Object[] newSample = new Object[sample.length + 1]; + newSample[0] = min; + System.arraycopy(sample, 0, newSample, 1, sample.length); + return efficientShardingThroughSampling( + tableId, newSample, approximateRowCnt, shardCount); + } return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); } @@ -196,6 +219,29 @@ private List splitTableIntoChunks( } } + private List efficientShardingThroughSampling( + TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { + LOG.info( + "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", + tableId, + approximateRowCnt, + shardCount); + + final List splits = new ArrayList<>(); + + // Calculate the shard boundaries + for (int i = 0; i < shardCount; i++) { + Object chunkStart = sampleData[(int) ((long) i * sampleData.length / shardCount)]; + Object chunkEnd = + i < shardCount - 1 + ? sampleData[(int) (((long) i + 1) * sampleData.length / shardCount)] + : null; + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + } + + return splits; + } + /** * Split table into evenly sized chunks based on the numeric min and max value of split column, * and tumble chunks in step size. diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java index 8fe3511d6f6..c9223c81ff2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java @@ -41,6 +41,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -117,6 +118,30 @@ public static Object queryMin( }); } + public static Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String minQuery = + String.format( + "SELECT %s FROM %s WHERE MOD((%s - (SELECT MIN(%s) FROM %s)), %s) = 0 ORDER BY %s", + quote(columnName), + quote(tableId), + quote(columnName), + quote(columnName), + quote(tableId), + inverseSamplingRate, + quote(columnName)); + return jdbc.queryAndMap( + minQuery, + resultSet -> { + List results = new ArrayList<>(); + while (resultSet.next()) { + results.add(resultSet.getObject(1)); + } + return results.toArray(); + }); + } + public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java index bbf5eb026ac..97f9bad2bd9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java @@ -43,6 +43,8 @@ public SqlServerSourceConfig( int splitSize, double distributionFactorUpper, double distributionFactorLower, + int sampleShardingThreshold, + int inverseSamplingRate, Properties dbzProperties, String driverClassName, String hostname, @@ -63,6 +65,8 @@ public SqlServerSourceConfig( splitSize, distributionFactorUpper, distributionFactorLower, + sampleShardingThreshold, + inverseSamplingRate, dbzProperties, driverClassName, hostname, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java index 4cb63a26e69..69acd3190af 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -85,6 +85,8 @@ public SqlServerSourceConfig create(int subtask) { splitSize, distributionFactorUpper, distributionFactorLower, + sampleShardingThreshold, + inverseSamplingRate, props, DRIVER_CLASS_NAME, hostname, diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index d25269ee5db..7d9342851ad 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -115,6 +115,13 @@ public Object queryMin( return SqlServerUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); } + @Override + public Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate); + } + @Override public Object queryNextChunkMax( JdbcConnection jdbc, @@ -185,6 +192,22 @@ private List splitTableIntoChunks( return splitEvenlySizedChunks( tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { + int shardCount = (int) (approximateRowCnt / chunkSize); + if (sourceConfig.getSampleShardingThreshold() < shardCount) { + Object[] sample = + sampleDataFromColumn( + jdbc, + tableId, + splitColumnName, + sourceConfig.getInverseSamplingRate()); + // In order to prevent data loss due to the absence of the minimum value in the + // sampled data, the minimum value is directly added here. + Object[] newSample = new Object[sample.length + 1]; + newSample[0] = min; + System.arraycopy(sample, 0, newSample, 1, sample.length); + return efficientShardingThroughSampling( + tableId, newSample, approximateRowCnt, shardCount); + } return splitUnevenlySizedChunks( jdbc, tableId, splitColumnName, min, max, chunkSize); } @@ -193,6 +216,29 @@ private List splitTableIntoChunks( } } + private List efficientShardingThroughSampling( + TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount) { + log.info( + "Use efficient sharding through sampling optimization for table {}, the approximate row count is {}, the shardCount is {}", + tableId, + approximateRowCnt, + shardCount); + + final List splits = new ArrayList<>(); + + // Calculate the shard boundaries + for (int i = 0; i < shardCount; i++) { + Object chunkStart = sampleData[(int) ((long) i * sampleData.length / shardCount)]; + Object chunkEnd = + i < shardCount - 1 + ? sampleData[(int) (((long) i + 1) * sampleData.length / shardCount)] + : null; + splits.add(ChunkRange.of(chunkStart, chunkEnd)); + } + + return splits; + } + /** * Split table into evenly sized chunks based on the numeric min and max value of split column, * and tumble chunks in step size. diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java index b07d65c7cfa..a1271849843 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java @@ -43,6 +43,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -120,6 +121,30 @@ public static Object queryMin( }); } + public static Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + throws SQLException { + final String minQuery = + String.format( + "SELECT %s FROM %s WHERE (%s - (SELECT MIN(%s) FROM %s)) %% %s = 0 ORDER BY %s", + quote(columnName), + quote(tableId), + quote(columnName), + quote(columnName), + quote(tableId), + inverseSamplingRate, + quote(columnName)); + return jdbc.queryAndMap( + minQuery, + resultSet -> { + List results = new ArrayList<>(); + while (resultSet.next()) { + results.add(resultSet.getObject(1)); + } + return results.toArray(); + }); + } + /** * Returns the next LSN to be read from the database. This is the LSN of the last record that * was read from the database. From be5d9a7689f16c3ef01e47a890300f7cfaa4a396 Mon Sep 17 00:00:00 2001 From: liuli Date: Tue, 30 May 2023 01:03:46 +0800 Subject: [PATCH 2/3] add doc --- docs/en/connector-v2/source/MySQL-CDC.md | 21 +++++++++++++++++++- docs/en/connector-v2/source/SqlServer-CDC.md | 21 +++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index 071bbbe0bc3..c197371efd0 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -41,8 +41,10 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL | connect.timeout.ms | Duration | No | 30000 | | connect.max-retries | Integer | No | 3 | | connection.pool.size | Integer | No | 20 | -| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 | +| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | | chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | +| sample-sharding.threshold | int | No | 1000 | +| inverse-sampling.rate | int | No | 1000 | | debezium.* | config | No | - | | format | Enum | No | DEFAULT | | common-options | | no | - | @@ -124,6 +126,23 @@ of table. The maximum fetch size for per poll when read table snapshot. +### chunk-key.even-distribution.factor.upper-bound [Double] + +The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. + +### chunk-key.even-distribution.factor.lower-bound [Double] + +The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. + +### sample-sharding.threshold [Integer] + +This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. + +### inverse-sampling.rate [Integer] + +The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. + + ### server-id [String] A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 5b310a9471c..fb7c0b39b5b 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -40,8 +40,10 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq | connect.timeout | Duration | No | 30s | | connect.max-retries | Integer | No | 3 | | connection.pool.size | Integer | No | 20 | -| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 | +| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | | chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | +| sample-sharding.threshold | int | No | 1000 | +| inverse-sampling.rate | int | No | 1000 | | debezium.* | config | No | - | | format | Enum | No | DEFAULT | | common-options | | no | - | @@ -123,6 +125,23 @@ of table. The maximum fetch size for per poll when read table snapshot. +### chunk-key.even-distribution.factor.upper-bound [Double] + +The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. + +### chunk-key.even-distribution.factor.lower-bound [Double] + +The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. + +### sample-sharding.threshold [Integer] + +This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. + +### inverse-sampling.rate [Integer] + +The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. + + ### server-time-zone [String] The session time zone in database server. From 258c307f70fb89bc39ecdf6f8483982326c61cde Mon Sep 17 00:00:00 2001 From: liuli Date: Tue, 30 May 2023 09:58:31 +0800 Subject: [PATCH 3/3] fix checkstyle --- docs/en/connector-v2/source/MySQL-CDC.md | 1 - docs/en/connector-v2/source/SqlServer-CDC.md | 1 - 2 files changed, 2 deletions(-) diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index c197371efd0..ff89fae574e 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -142,7 +142,6 @@ This configuration specifies the threshold of estimated shard count to trigger t The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. - ### server-id [String] A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index fb7c0b39b5b..562682742b6 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -141,7 +141,6 @@ This configuration specifies the threshold of estimated shard count to trigger t The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. - ### server-time-zone [String] The session time zone in database server.