From e951847287d234cd72988565cc145b95217cb3b0 Mon Sep 17 00:00:00 2001 From: ddna1021 Date: Fri, 7 Jul 2023 20:01:58 +0800 Subject: [PATCH 1/7] make data block balance before importing data #5026 support spark and flink engine If you need to use this feature, add "partition_balance = true" in the env configuration, the default value is false --- .../apache/seatunnel/api/common/CommonOptions.java | 9 +++++++++ .../flink/execution/SinkExecuteProcessor.java | 11 +++++++++++ .../spark/execution/SinkExecuteProcessor.java | 12 ++++++++++++ 3 files changed, 32 insertions(+) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java index e8c6f78781f..59495bb9786 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java @@ -66,4 +66,13 @@ public interface CommonOptions { .withDescription( "When parallelism is not specified, the parallelism in env is used by default. " + "When parallelism is specified, it will override the parallelism in env."); + + Option PARTITION_BALANCE = + Options.key("partition_balance") + .booleanType() + .defaultValue(false) + .withDescription( + "When partition_balance is set to true, " + + "in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, " + + "which can avoid problems caused by data skew, but it will consume some extra time. The default value is false"); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index d8fa8eeddff..022b4edfa24 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -115,6 +115,17 @@ public List> execute(List> upstreamDataStreams) DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); saveModeSink.handleSaveMode(dataSaveMode); } + Boolean needBalanceInEnv = + flinkRuntimeEnvironment + .getConfig() + .getBoolean(CommonOptions.PARTITION_BALANCE.key()); + boolean needBalance = + needBalanceInEnv != null + ? needBalanceInEnv + : CommonOptions.PARTITION_BALANCE.defaultValue(); + if (needBalance) { + stream = stream.shuffle(); + } DataStreamSink dataStreamSink = stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(seaTunnelSink))) .name(seaTunnelSink.getPluginName()); diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index f4d3c0b15b5..2eee3da9e58 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -110,6 +110,18 @@ public List> execute(List> upstreamDataStreams) CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue()); } + Boolean needBalanceInEnv = + sparkRuntimeEnvironment + .getConfig() + .getBoolean(CommonOptions.PARTITION_BALANCE.key()); + boolean needBalance = + needBalanceInEnv != null + ? needBalanceInEnv + : CommonOptions.PARTITION_BALANCE.defaultValue(); + + if (needBalance) { + dataset = dataset.repartition(parallelism); + } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); // TODO modify checkpoint location seaTunnelSink.setTypeInfo( From 8693bf10a4599f09bef3e4d956e723daa3a1c1f1 Mon Sep 17 00:00:00 2001 From: fandonglai Date: Mon, 10 Jul 2023 20:51:49 +0800 Subject: [PATCH 2/7] doc for partition_balance #5026 --- docs/en/connector-v2/sink/common-options.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md index 2addc49278d..f14a42a2e66 100644 --- a/docs/en/connector-v2/sink/common-options.md +++ b/docs/en/connector-v2/sink/common-options.md @@ -6,6 +6,8 @@ |-------------------|--------|----------|---------------| | source_table_name | string | no | - | | parallelism | int | no | - | +| partition_balance | boolean| no | false | + ### source_table_name [string] @@ -19,6 +21,16 @@ When `parallelism` is not specified, the `parallelism` in env is used by default When parallelism is specified, it will override the parallelism in env. +### partition_balance [boolean] +When `partition_balance` is set to true, in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, which can avoid problems caused by data skew, but it will consume some extra time. + +The default value is false, support Spark and Flink engine + +When `partition_balance` is not specified, the `partition_balance` in env is used by default. + +When `partition_balance` is specified, it will override the `partition_balance` in env. + + ## Examples ```bash From 35bb0def3ce9c37153dd713f7eb7d507b82bcd1a Mon Sep 17 00:00:00 2001 From: fandonglai Date: Tue, 11 Jul 2023 14:07:41 +0800 Subject: [PATCH 3/7] doc for partition_balance #5026 --- docs/en/connector-v2/sink/common-options.md | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md index f14a42a2e66..6b42fa34bfc 100644 --- a/docs/en/connector-v2/sink/common-options.md +++ b/docs/en/connector-v2/sink/common-options.md @@ -2,12 +2,11 @@ > Common parameters of sink connectors -| name | type | required | default value | -|-------------------|--------|----------|---------------| -| source_table_name | string | no | - | -| parallelism | int | no | - | -| partition_balance | boolean| no | false | - +| name | type | required | default value | +|-------------------|---------|----------|---------------| +| source_table_name | string | no | - | +| parallelism | int | no | - | +| partition_balance | boolean | no | false | ### source_table_name [string] @@ -22,7 +21,8 @@ When `parallelism` is not specified, the `parallelism` in env is used by default When parallelism is specified, it will override the parallelism in env. ### partition_balance [boolean] -When `partition_balance` is set to true, in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, which can avoid problems caused by data skew, but it will consume some extra time. + +When `partition_balance` is set to true, in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, which can avoid problems caused by data skew, but it will consume some extra time. The default value is false, support Spark and Flink engine @@ -30,7 +30,6 @@ When `partition_balance` is not specified, the `partition_balance` in env is use When `partition_balance` is specified, it will override the `partition_balance` in env. - ## Examples ```bash From a6b106965c0257ef7fe1ab84395a633210603e16 Mon Sep 17 00:00:00 2001 From: fandonglai Date: Tue, 11 Jul 2023 16:26:34 +0800 Subject: [PATCH 4/7] doc for partition_balance --- docs/en/connector-v2/sink/common-options.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md index 6b42fa34bfc..fe8e059c87d 100644 --- a/docs/en/connector-v2/sink/common-options.md +++ b/docs/en/connector-v2/sink/common-options.md @@ -24,7 +24,7 @@ When parallelism is specified, it will override the parallelism in env. When `partition_balance` is set to true, in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, which can avoid problems caused by data skew, but it will consume some extra time. -The default value is false, support Spark and Flink engine +The default value is false, only supported by Spark and Flink engine When `partition_balance` is not specified, the `partition_balance` in env is used by default. From bdab8d625a9f14ee4e96a47e1b16ba6a76dbbc0a Mon Sep 17 00:00:00 2001 From: fandonglai Date: Wed, 19 Jul 2023 22:46:51 +0800 Subject: [PATCH 5/7] Change the way for getting configuration --- docs/en/connector-v2/sink/common-options.md | 2 +- .../flink/execution/SinkExecuteProcessor.java | 15 +++++---------- .../spark/execution/SinkExecuteProcessor.java | 16 +++++----------- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/docs/en/connector-v2/sink/common-options.md b/docs/en/connector-v2/sink/common-options.md index fe8e059c87d..6b42fa34bfc 100644 --- a/docs/en/connector-v2/sink/common-options.md +++ b/docs/en/connector-v2/sink/common-options.md @@ -24,7 +24,7 @@ When parallelism is specified, it will override the parallelism in env. When `partition_balance` is set to true, in the sink process, a repartition will be performed first to ensure that the size of each partition is roughly the same, which can avoid problems caused by data skew, but it will consume some extra time. -The default value is false, only supported by Spark and Flink engine +The default value is false, support Spark and Flink engine When `partition_balance` is not specified, the `partition_balance` in env is used by default. diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 022b4edfa24..084310369da 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -115,16 +115,11 @@ public List> execute(List> upstreamDataStreams) DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode(); saveModeSink.handleSaveMode(dataSaveMode); } - Boolean needBalanceInEnv = - flinkRuntimeEnvironment - .getConfig() - .getBoolean(CommonOptions.PARTITION_BALANCE.key()); - boolean needBalance = - needBalanceInEnv != null - ? needBalanceInEnv - : CommonOptions.PARTITION_BALANCE.defaultValue(); - if (needBalance) { - stream = stream.shuffle(); + if (sinkConfig.hasPath(CommonOptions.PARTITION_BALANCE.key())) { + Boolean needBalance = sinkConfig.getBoolean(CommonOptions.PARTITION_BALANCE.key()); + if (needBalance) { + stream = stream.shuffle(); + } } DataStreamSink dataStreamSink = stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(seaTunnelSink))) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 2eee3da9e58..84dfc11d215 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -110,17 +110,11 @@ public List> execute(List> upstreamDataStreams) CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue()); } - Boolean needBalanceInEnv = - sparkRuntimeEnvironment - .getConfig() - .getBoolean(CommonOptions.PARTITION_BALANCE.key()); - boolean needBalance = - needBalanceInEnv != null - ? needBalanceInEnv - : CommonOptions.PARTITION_BALANCE.defaultValue(); - - if (needBalance) { - dataset = dataset.repartition(parallelism); + if (sinkConfig.hashPath(CommonOptions.PARTITION_BALANCE.key())) { + boolean needBalance = sinkConfig.getBoolean(); + if (needBalance) { + dataset = dataset.repartition(parallelism); + } } dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(), parallelism); // TODO modify checkpoint location From 112d1ce540acca07e52fee9383d3e3c0d88dca9f Mon Sep 17 00:00:00 2001 From: fandonglai Date: Thu, 20 Jul 2023 11:36:40 +0800 Subject: [PATCH 6/7] Change the way for getting configuration --- .../core/starter/spark/execution/SinkExecuteProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 84dfc11d215..b8afc05ff53 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -111,7 +111,7 @@ public List> execute(List> upstreamDataStreams) CommonOptions.PARALLELISM.defaultValue()); } if (sinkConfig.hashPath(CommonOptions.PARTITION_BALANCE.key())) { - boolean needBalance = sinkConfig.getBoolean(); + boolean needBalance = sinkConfig.getBoolean(CommonOptions.PARTITION_BALANCE.key()); if (needBalance) { dataset = dataset.repartition(parallelism); } From 29485a9a22f8c1263bd112d6977ae1741e4b5a07 Mon Sep 17 00:00:00 2001 From: fandonglai Date: Mon, 7 Aug 2023 12:56:24 +0800 Subject: [PATCH 7/7] fix coding error --- .../core/starter/spark/execution/SinkExecuteProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index b8afc05ff53..cf84cbeadb6 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -110,7 +110,7 @@ public List> execute(List> upstreamDataStreams) CommonOptions.PARALLELISM.key(), CommonOptions.PARALLELISM.defaultValue()); } - if (sinkConfig.hashPath(CommonOptions.PARTITION_BALANCE.key())) { + if (sinkConfig.hasPath(CommonOptions.PARTITION_BALANCE.key())) { boolean needBalance = sinkConfig.getBoolean(CommonOptions.PARTITION_BALANCE.key()); if (needBalance) { dataset = dataset.repartition(parallelism);