From b914409f862caf3f28700bc4df3c17258f7abede Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:33:14 +0300 Subject: [PATCH] [Bug Fix]: Deem hash repartition unnecessary when input and output has 1 partition (#10095) * Add input partition number check * Minor changes --- .../enforce_distribution.rs | 3 +- datafusion/sqllogictest/test_files/joins.slt | 101 ++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 145f08af76dd..c9c54a46bd1c 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -875,7 +875,8 @@ fn add_hash_on_top( n_target: usize, ) -> Result { // Early return if hash repartition is unnecessary - if n_target == 1 { + // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary. + if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 { return Ok(input); } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 04d6e46caf5e..0fe73b2c9522 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3587,3 +3587,104 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS id1 FROM join_partitione 1 1 1 + + +statement ok +set datafusion.explain.logical_plan_only = false; + +query TT +EXPLAIN SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +logical_plan +01)Projection: a.c, a.d, rhs.e, rhs.f +02)--Full Join: a.c = rhs.e +03)----SubqueryAlias: a +04)------Union +05)--------Projection: Int64(1) AS c, Int64(2) AS d +06)----------EmptyRelation +07)--------Projection: Int64(1) AS c, Int64(3) AS d +08)----------EmptyRelation +09)----SubqueryAlias: rhs +10)------Projection: Int64(1) AS e, Int64(3) AS f +11)--------EmptyRelation +physical_plan +01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)] +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1 +06)----------ProjectionExec: expr=[1 as e, 3 as f] +07)------------PlaceholderRowExec +08)------CoalesceBatchesExec: target_batch_size=2 +09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2 +10)----------UnionExec +11)------------ProjectionExec: expr=[1 as c, 2 as d] +12)--------------PlaceholderRowExec +13)------------ProjectionExec: expr=[1 as c, 3 as d] +14)--------------PlaceholderRowExec + +query IIII +SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +1 2 1 3 +1 3 1 3 + +statement ok +set datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +logical_plan +01)Projection: a.c, a.d, rhs.e, rhs.f +02)--Full Join: a.c = rhs.e +03)----SubqueryAlias: a +04)------Union +05)--------Projection: Int64(1) AS c, Int64(2) AS d +06)----------EmptyRelation +07)--------Projection: Int64(1) AS c, Int64(3) AS d +08)----------EmptyRelation +09)----SubqueryAlias: rhs +10)------Projection: Int64(1) AS e, Int64(3) AS f +11)--------EmptyRelation +physical_plan +01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f] +02)--CoalesceBatchesExec: target_batch_size=2 +03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)] +04)------ProjectionExec: expr=[1 as e, 3 as f] +05)--------PlaceholderRowExec +06)------CoalesceBatchesExec: target_batch_size=2 +07)--------RepartitionExec: partitioning=Hash([c@0], 1), input_partitions=2 +08)----------UnionExec +09)------------ProjectionExec: expr=[1 as c, 2 as d] +10)--------------PlaceholderRowExec +11)------------ProjectionExec: expr=[1 as c, 3 as d] +12)--------------PlaceholderRowExec + +query IIII +SELECT * FROM ( + SELECT 1 as c, 2 as d + UNION ALL + SELECT 1 as c, 3 AS d +) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e; +---- +1 2 1 3 +1 3 1 3 + +statement ok +set datafusion.explain.logical_plan_only = true; + +statement ok +set datafusion.execution.target_partitions = 2;