Skip to content

Commit

Permalink
[MINOR]: Fix some minor silent bugs (#11127)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored Jun 27, 2024
1 parent ff116c3 commit 8216e32
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
59 changes: 30 additions & 29 deletions datafusion/core/tests/fifo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,6 @@ mod unix_test {
.set_bool("datafusion.execution.coalesce_batches", false)
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];

// Join filter
let a1_iter = 0..TEST_DATA_SIZE;
// Join key
let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
let lines = a1_iter
.zip(a2_iter)
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();

// Create a new temporary FIFO file
let tmp_dir = TempDir::new()?;
Expand All @@ -238,22 +227,6 @@ mod unix_test {
// Create a mutex for tracking if the right input source is waiting for data.
let waiting = Arc::new(AtomicBool::new(true));

// Create writing threads for the left and right FIFO files
tasks.push(create_writing_thread(
left_fifo.clone(),
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
TEST_BATCH_SIZE,
));
tasks.push(create_writing_thread(
right_fifo.clone(),
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
TEST_BATCH_SIZE,
));

// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Expand All @@ -264,10 +237,10 @@ mod unix_test {
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];

// Set unbounded sorted files read configuration
let provider = fifo_table(schema.clone(), left_fifo, order.clone());
let provider = fifo_table(schema.clone(), left_fifo.clone(), order.clone());
ctx.register_table("left", provider)?;

let provider = fifo_table(schema.clone(), right_fifo, order);
let provider = fifo_table(schema.clone(), right_fifo.clone(), order);
ctx.register_table("right", provider)?;

// Execute the query, with no matching rows. (since key is modulus 10)
Expand All @@ -287,6 +260,34 @@ mod unix_test {
.await?;
let mut stream = df.execute_stream().await?;
let mut operations = vec![];

// Tasks
let mut tasks: Vec<JoinHandle<()>> = vec![];

// Join filter
let a1_iter = 0..TEST_DATA_SIZE;
// Join key
let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
let lines = a1_iter
.zip(a2_iter)
.map(|(a1, a2)| format!("{a1},{a2}\n"))
.collect::<Vec<_>>();

// Create writing threads for the left and right FIFO files
tasks.push(create_writing_thread(
left_fifo,
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
TEST_BATCH_SIZE,
));
tasks.push(create_writing_thread(
right_fifo,
"a1,a2\n".to_owned(),
lines.clone(),
waiting.clone(),
TEST_BATCH_SIZE,
));
// Partial.
while let Some(Ok(batch)) = stream.next().await {
waiting.store(false, Ordering::SeqCst);
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {
for table in &tables {
ctx.register_table(
table.name.as_str(),
Arc::new(MemTable::try_new(Arc::new(table.schema.clone()), vec![])?),
Arc::new(MemTable::try_new(
Arc::new(table.schema.clone()),
vec![vec![]],
)?),
)?;
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ impl Partitioning {
match required {
Distribution::UnspecifiedDistribution => true,
Distribution::SinglePartition if self.partition_count() == 1 => true,
// When partition count is 1, hash requirement is satisfied.
Distribution::HashPartitioned(_) if self.partition_count() == 1 => true,
Distribution::HashPartitioned(required_exprs) => {
match self {
// Here we do not check the partition count for hash partitioning and assumes the partition count
Expand Down Expand Up @@ -290,7 +292,7 @@ mod tests {
assert_eq!(result, (true, false, false, false, false))
}
Distribution::HashPartitioned(_) => {
assert_eq!(result, (false, false, false, true, false))
assert_eq!(result, (true, false, false, true, false))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec {
vec![Distribution::UnspecifiedDistribution]
}
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => {
vec![Distribution::HashPartitioned(self.output_group_expr())]
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
}
AggregateMode::Final | AggregateMode::Single => {
vec![Distribution::SinglePartition]
Expand Down

0 comments on commit 8216e32

Please sign in to comment.