Skip to content

Commit

Permalink
Serialize preserve_partitioning in SortExec (apache#5306)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkharderdev authored and jiangzhx committed Feb 24, 2023
1 parent 7ae66ce commit 96cee56
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 1 deletion.
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,7 @@ message SortExecNode {
repeated PhysicalExprNode expr = 2;
// Maximum number of highest/lowest rows to fetch; negative means no limit
int64 fetch = 3;
bool preserve_partitioning = 4;
}

message SortPreservingMergeExecNode {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 44 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,12 @@ impl AsExecutionPlan for PhysicalPlanNode {
} else {
Some(sort.fetch as usize)
};
Ok(Arc::new(SortExec::try_new(exprs, input, fetch)?))
Ok(Arc::new(SortExec::new_with_partitioning(
exprs,
input,
sort.preserve_partitioning,
fetch,
)))
}
PhysicalPlanType::SortPreservingMerge(sort) => {
let input: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -1043,6 +1048,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Some(n) => n as i64,
_ => -1,
},
preserve_partitioning: exec.preserve_partitioning(),
},
))),
})
Expand Down Expand Up @@ -1441,6 +1447,43 @@ mod roundtrip_tests {
)?))
}

#[test]
fn roundtrip_sort_preserve_partitioning() -> Result<()> {
let field_a = Field::new("a", DataType::Boolean, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let sort_exprs = vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions {
descending: true,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
},
];

roundtrip_test(Arc::new(SortExec::new_with_partitioning(
sort_exprs.clone(),
Arc::new(EmptyExec::new(false, schema.clone())),
false,
None,
)))?;

roundtrip_test(Arc::new(SortExec::new_with_partitioning(
sort_exprs,
Arc::new(EmptyExec::new(false, schema)),
true,
None,
)))
}

#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
Expand Down

0 comments on commit 96cee56

Please sign in to comment.