From d145880addd9d9f3ff781b69130ce82ffa61b560 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 16 Apr 2024 09:58:33 +0300 Subject: [PATCH 1/2] Add input partition number check --- .../enforce_distribution.rs | 3 +- datafusion/sqllogictest/test_files/joins.slt | 98 +++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 145f08af76dd..c9c54a46bd1c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -875,7 +875,8 @@ fn add_hash_on_top( n_target: usize, ) -> Result { // Early return if hash repartition is unnecessary - if n_target == 1 { + // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary. + if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 { return Ok(input); } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 04d6e46caf5e..a945362dc417 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3587,3 +3587,101 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitione 1 1 1 + + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +set datafusion.execution.target_partitions = 2; + +query TT +EXPLAIN SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +logical_plan +01)Projection: a.c, a.d, rhs.e, rhs.f +02)--Full Join: a.c = rhs.e +03)----SubqueryAlias: a +04)------Union +05)--------Projection: Int64(1) AS c, Int64(2) AS d +06)----------EmptyRelation +07)--------Projection: Int64(1) AS c, Int64(3) AS d +08)----------EmptyRelation +09)----SubqueryAlias: rhs +10)------Projection: Int64(1) AS e, Int64(3) AS f +11)--------EmptyRelation +physical_plan +01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)] +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1 +06)----------ProjectionExec: expr=[1 as e, 3 as f] +07)------------PlaceholderRowExec +08)------CoalesceBatchesExec: target_batch_size=2 +09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2 +10)----------UnionExec +11)------------ProjectionExec: expr=[1 as c, 2 as d] +12)--------------PlaceholderRowExec +13)------------ProjectionExec: expr=[1 as c, 3 as d] +14)--------------PlaceholderRowExec + +query IIII +SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +1 2 1 3 +1 3 1 3 + +statement ok +set datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +logical_plan +01)Projection: a.c, a.d, rhs.e, rhs.f +02)--Full Join: a.c = rhs.e +03)----SubqueryAlias: a +04)------Union +05)--------Projection: Int64(1) AS c, Int64(2) AS d +06)----------EmptyRelation +07)--------Projection: Int64(1) AS c, Int64(3) AS d +08)----------EmptyRelation +09)----SubqueryAlias: rhs +10)------Projection: Int64(1) AS e, Int64(3) AS f +11)--------EmptyRelation +physical_plan +01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)] +04)------ProjectionExec: expr=[1 as e, 3 as f] +05)--------PlaceholderRowExec +06)------CoalesceBatchesExec: target_batch_size=2 +07)--------RepartitionExec: partitioning=Hash([c@0], 1), input_partitions=2 +08)----------UnionExec +09)------------ProjectionExec: expr=[1 as c, 2 as d] +10)--------------PlaceholderRowExec +11)------------ProjectionExec: expr=[1 as c, 3 as d] +12)--------------PlaceholderRowExec + +query IIII +SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +1 2 1 3 +1 3 1 3 From e61499347b899da38e2eb2f583c54640f0515a91 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 16 Apr 2024 10:11:44 +0300 Subject: [PATCH 2/2] Minor changes --- datafusion/sqllogictest/test_files/joins.slt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index a945362dc417..0fe73b2c9522 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3592,9 +3592,6 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitione statement ok set datafusion.explain.logical_plan_only = false; -statement ok -set datafusion.execution.target_partitions = 2; - query TT EXPLAIN SELECT * FROM ( SELECT 1 as c, 2 as d @@ -3685,3 +3682,9 @@ SELECT * FROM ( ---- 1 2 1 3 1 3 1 3 + +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.execution.target_partitions = 2;