Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate #4856

Merged
merged 3 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/en/connector-v2/source/MySQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
| connect.timeout.ms | Duration | No | 30000 |
| connect.max-retries | Integer | No | 3 |
| connection.pool.size | Integer | No | 20 |
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| sample-sharding.threshold | int | No | 1000 |
| inverse-sampling.rate | int | No | 1000 |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |
Expand Down Expand Up @@ -124,6 +126,22 @@ of table.

The maximum fetch size for per poll when read table snapshot.

### chunk-key.even-distribution.factor.upper-bound [Double]

The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0.

### chunk-key.even-distribution.factor.lower-bound [Double]

The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05.

### sample-sharding.threshold [Integer]

This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.

### inverse-sampling.rate [Integer]

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.

### server-id [String]

A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range
Expand Down
20 changes: 19 additions & 1 deletion docs/en/connector-v2/source/SqlServer-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq
| connect.timeout | Duration | No | 30s |
| connect.max-retries | Integer | No | 3 |
| connection.pool.size | Integer | No | 20 |
| chunk-key.even-distribution.factor.upper-bound | Double | No | 1000 |
| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 |
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 |
| sample-sharding.threshold | int | No | 1000 |
| inverse-sampling.rate | int | No | 1000 |
| debezium.* | config | No | - |
| format | Enum | No | DEFAULT |
| common-options | | no | - |
Expand Down Expand Up @@ -123,6 +125,22 @@ of table.

The maximum fetch size for per poll when read table snapshot.

### chunk-key.even-distribution.factor.upper-bound [Double]

The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0.

### chunk-key.even-distribution.factor.lower-bound [Double]

The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05.

### sample-sharding.threshold [Integer]

This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.

### inverse-sampling.rate [Integer]

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.

### server-time-zone [String]

The session time zone in database server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class BaseSourceConfig implements SourceConfig {

@Getter protected final double distributionFactorUpper;
@Getter protected final double distributionFactorLower;
@Getter protected final int sampleShardingThreshold;
@Getter protected final int inverseSamplingRate;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand All @@ -49,12 +51,16 @@ public BaseSourceConfig(
int splitSize,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
int inverseSamplingRate,
Properties dbzProperties) {
this.startupConfig = startupConfig;
this.stopConfig = stopConfig;
this.splitSize = splitSize;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.sampleShardingThreshold = sampleShardingThreshold;
this.inverseSamplingRate = inverseSamplingRate;
this.dbzProperties = dbzProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public JdbcSourceConfig(
int splitSize,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
int inverseSamplingRate,
Properties dbzProperties,
String driverClassName,
String hostname,
Expand All @@ -69,6 +71,8 @@ public JdbcSourceConfig(
splitSize,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
inverseSamplingRate,
dbzProperties);
this.driverClassName = driverClassName;
this.hostname = hostname;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
protected StartupConfig startupConfig;
protected StopConfig stopConfig;
protected boolean includeSchemaChanges = false;
protected double distributionFactorUpper = 1000.0d;
protected double distributionFactorLower = 0.05d;
protected double distributionFactorUpper =
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
protected double distributionFactorLower =
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
protected int sampleShardingThreshold =
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD.defaultValue();
protected int inverseSamplingRate = JdbcSourceOptions.INVERSE_SAMPLING_RATE.defaultValue();
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
Expand Down Expand Up @@ -139,6 +144,34 @@ public JdbcSourceConfigFactory distributionFactorLower(double distributionFactor
return this;
}

/**
* The threshold for the row count to trigger sample-based sharding strategy. When the
* distribution factor is within the upper and lower bounds, if the approximate row count
* exceeds this threshold, the sample-based sharding strategy will be used. This can help to
* handle large datasets in a more efficient manner.
*
* @param sampleShardingThreshold The threshold of row count.
* @return This JdbcSourceConfigFactory.
*/
public JdbcSourceConfigFactory sampleShardingThreshold(int sampleShardingThreshold) {
this.sampleShardingThreshold = sampleShardingThreshold;
return this;
}

/**
* The inverse of the sampling rate to be used for data sharding based on sampling. The actual
* sampling rate is 1 / inverseSamplingRate. For instance, if inverseSamplingRate is 1000, then
* the sampling rate is 1/1000, meaning every 1000th record will be included in the sample used
* for sharding.
*
* @param inverseSamplingRate The value representing the inverse of the desired sampling rate.
* @return this JdbcSourceConfigFactory instance.
*/
public JdbcSourceConfigFactory inverseSamplingRate(int inverseSamplingRate) {
this.inverseSamplingRate = inverseSamplingRate;
return this;
}

/** The maximum fetch size for per poll when read table snapshot. */
public JdbcSourceConfigFactory fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
Expand Down Expand Up @@ -201,6 +234,8 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
this.distributionFactorLower =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
this.sampleShardingThreshold = config.get(JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD);
this.inverseSamplingRate = config.get(JdbcSourceOptions.INVERSE_SAMPLING_RATE);
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class JdbcSourceOptions extends SourceOptions {
public static final Option<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
Options.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.defaultValue(100.0d)
.withDescription(
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
Expand All @@ -118,4 +118,25 @@ public class JdbcSourceOptions extends SourceOptions {
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

public static final Option<Integer> SAMPLE_SHARDING_THRESHOLD =
Options.key("sample-sharding.threshold")
.intType()
.defaultValue(1000) // 1000 shards
.withDescription(
"The threshold of estimated shard count to trigger the sample sharding strategy. "
+ "When the distribution factor is outside the upper and lower bounds, "
+ "and if the estimated shard count (approximateRowCnt/chunkSize) exceeds this threshold, "
+ "the sample sharding strategy will be used. "
+ "This strategy can help to handle large datasets more efficiently. "
+ "The default value is 1000 shards.");
public static final Option<Integer> INVERSE_SAMPLING_RATE =
Options.key("inverse-sampling.rate")
.intType()
.defaultValue(1000) // 1/1000 sampling rate
.withDescription(
"The inverse of the sampling rate for the sample sharding strategy. "
+ "The value represents the denominator of the sampling rate fraction. "
+ "For example, a value of 1000 means a sampling rate of 1/1000. "
+ "This parameter is used when the sample sharding strategy is triggered.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException;

/**
* Performs a sampling operation on the specified column of a table in a JDBC-connected
* database.
*
* @param jdbc The JDBC connection object used to connect to the database.
* @param tableId The ID of the table in which the column resides.
* @param columnName The name of the column to be sampled.
* @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from
* the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled.
* @return Returns a List of sampled data from the specified column.
* @throws SQLException If an SQL error occurs during the sampling operation.
*/
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException;

/**
* Query the maximum value of the next chunk, and the next chunk must be greater than or equal
* to <code>includedLowerBound</code> value [min_1, max_1), [min_2, max_2),... [min_n, null).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public MySqlSourceConfig(
int splitSize,
double distributionFactorUpper,
double distributionFactorLower,
int sampleShardingThreshold,
int inverseSamplingRate,
Properties dbzProperties,
String driverClassName,
String hostname,
Expand All @@ -63,6 +65,8 @@ public MySqlSourceConfig(
splitSize,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
inverseSamplingRate,
dbzProperties,
driverClassName,
hostname,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public MySqlSourceConfig create(int subtaskId) {
splitSize,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold,
inverseSamplingRate,
props,
driverClassName,
hostname,
Expand Down
Loading