From 8216e32e87b2238d8814fe16215c8770d6c327c8 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Thu, 27 Jun 2024 17:10:31 +0300 Subject: [PATCH] [MINOR]: Fix some minor silent bugs (#11127) --- datafusion/core/tests/fifo/mod.rs | 59 ++++++++++--------- datafusion/core/tests/tpcds_planning.rs | 5 +- datafusion/physical-expr/src/partitioning.rs | 4 +- .../physical-plan/src/aggregates/mod.rs | 2 +- 4 files changed, 38 insertions(+), 32 deletions(-) diff --git a/datafusion/core/tests/fifo/mod.rs b/datafusion/core/tests/fifo/mod.rs index 2e21abffab87..1df97b1636c7 100644 --- a/datafusion/core/tests/fifo/mod.rs +++ b/datafusion/core/tests/fifo/mod.rs @@ -217,17 +217,6 @@ mod unix_test { .set_bool("datafusion.execution.coalesce_batches", false) .with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - // Tasks - let mut tasks: Vec> = vec![]; - - // Join filter - let a1_iter = 0..TEST_DATA_SIZE; - // Join key - let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10); - let lines = a1_iter - .zip(a2_iter) - .map(|(a1, a2)| format!("{a1},{a2}\n")) - .collect::>(); // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; @@ -238,22 +227,6 @@ mod unix_test { // Create a mutex for tracking if the right input source is waiting for data. let waiting = Arc::new(AtomicBool::new(true)); - // Create writing threads for the left and right FIFO files - tasks.push(create_writing_thread( - left_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); - tasks.push(create_writing_thread( - right_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); - // Create schema let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::UInt32, false), @@ -264,10 +237,10 @@ mod unix_test { let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]]; // Set unbounded sorted files read configuration - let provider = fifo_table(schema.clone(), left_fifo, order.clone()); + let provider = fifo_table(schema.clone(), left_fifo.clone(), order.clone()); ctx.register_table("left", provider)?; - let provider = fifo_table(schema.clone(), right_fifo, order); + let provider = fifo_table(schema.clone(), right_fifo.clone(), order); ctx.register_table("right", provider)?; // Execute the query, with no matching rows. (since key is modulus 10) @@ -287,6 +260,34 @@ mod unix_test { .await?; let mut stream = df.execute_stream().await?; let mut operations = vec![]; + + // Tasks + let mut tasks: Vec> = vec![]; + + // Join filter + let a1_iter = 0..TEST_DATA_SIZE; + // Join key + let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10); + let lines = a1_iter + .zip(a2_iter) + .map(|(a1, a2)| format!("{a1},{a2}\n")) + .collect::>(); + + // Create writing threads for the left and right FIFO files + tasks.push(create_writing_thread( + left_fifo, + "a1,a2\n".to_owned(), + lines.clone(), + waiting.clone(), + TEST_BATCH_SIZE, + )); + tasks.push(create_writing_thread( + right_fifo, + "a1,a2\n".to_owned(), + lines.clone(), + waiting.clone(), + TEST_BATCH_SIZE, + )); // Partial. while let Some(Ok(batch)) = stream.next().await { waiting.store(false, Ordering::SeqCst); diff --git a/datafusion/core/tests/tpcds_planning.rs b/datafusion/core/tests/tpcds_planning.rs index 44fb0afff319..b99bc2680044 100644 --- a/datafusion/core/tests/tpcds_planning.rs +++ b/datafusion/core/tests/tpcds_planning.rs @@ -1044,7 +1044,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> { for table in &tables { ctx.register_table( table.name.as_str(), - Arc::new(MemTable::try_new(Arc::new(table.schema.clone()), vec![])?), + Arc::new(MemTable::try_new( + Arc::new(table.schema.clone()), + vec![vec![]], + )?), )?; } diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index fcb3278b6022..273c77fb1d5e 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -152,6 +152,8 @@ impl Partitioning { match required { Distribution::UnspecifiedDistribution => true, Distribution::SinglePartition if self.partition_count() == 1 => true, + // When partition count is 1, hash requirement is satisfied. + Distribution::HashPartitioned(_) if self.partition_count() == 1 => true, Distribution::HashPartitioned(required_exprs) => { match self { // Here we do not check the partition count for hash partitioning and assumes the partition count @@ -290,7 +292,7 @@ mod tests { assert_eq!(result, (true, false, false, false, false)) } Distribution::HashPartitioned(_) => { - assert_eq!(result, (false, false, false, true, false)) + assert_eq!(result, (true, false, false, true, false)) } } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4c187f03f36b..533d10357b0e 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec { vec![Distribution::UnspecifiedDistribution] } AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => { - vec![Distribution::HashPartitioned(self.output_group_expr())] + vec![Distribution::HashPartitioned(self.group_by.input_exprs())] } AggregateMode::Final | AggregateMode::Single => { vec![Distribution::SinglePartition]