Skip to content

Commit

Permalink
Refactor of Ordering and Prunability Traversals and States (#7985)
Browse files Browse the repository at this point in the history
* simplify ExprOrdering

* Comment improvements

* Move map/transform comment up

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
berkaysynnada and ozankabak authored Oct 30, 2023
1 parent 448dff5 commit 0d4dc36
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 29 deletions.
59 changes: 31 additions & 28 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,45 +155,44 @@ impl Neg for SortProperties {
#[derive(Debug)]
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: Option<SortProperties>,
pub children_states: Option<Vec<SortProperties>>,
pub state: SortProperties,
pub children_states: Vec<SortProperties>,
}

impl ExprOrdering {
/// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states
/// for `expr` and its children.
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let size = expr.children().len();
Self {
expr,
state: None,
children_states: None,
state: SortProperties::Unordered,
children_states: vec![SortProperties::Unordered; size],
}
}

pub fn children(&self) -> Vec<ExprOrdering> {
/// Updates this [`ExprOrdering`]'s children states with the given states.
pub fn with_new_children(mut self, children_states: Vec<SortProperties>) -> Self {
self.children_states = children_states;
self
}

/// Creates new [`ExprOrdering`] objects for each child of the expression.
pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
self.expr
.children()
.into_iter()
.map(ExprOrdering::new)
.collect()
}

pub fn new_with_children(
children_states: Vec<SortProperties>,
parent_expr: Arc<dyn PhysicalExpr>,
) -> Self {
Self {
expr: parent_expr,
state: None,
children_states: Some(children_states),
}
}
}

impl TreeNode for ExprOrdering {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
for child in self.children_expr_orderings() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
Expand All @@ -207,17 +206,20 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
if self.children_states.is_empty() {
Ok(self)
} else {
Ok(ExprOrdering::new_with_children(
children
let child_expr_orderings = self.children_expr_orderings();
// After mapping over the children, the function `F` applies to the
// current object and updates its state.
Ok(self.with_new_children(
child_expr_orderings
.into_iter()
// Update children states after this transformation:
.map(transform)
.map_ok(|c| c.state.unwrap_or(SortProperties::Unordered))
// Extract the state (i.e. sort properties) information:
.map_ok(|c| c.state)
.collect::<Result<Vec<_>>>()?,
self.expr,
))
}
}
Expand Down Expand Up @@ -248,13 +250,13 @@ pub fn update_ordering(
// a BinaryExpr like a + b), and there is an ordering equivalence of
// it (let's say like c + d), we actually can find it at this step.
if sort_expr.expr.eq(&node.expr) {
node.state = Some(SortProperties::Ordered(sort_expr.options));
node.state = SortProperties::Ordered(sort_expr.options);
return Ok(Transformed::Yes(node));
}

if let Some(children_sort_options) = &node.children_states {
if !node.expr.children().is_empty() {
// We have an intermediate (non-leaf) node, account for its children:
node.state = Some(node.expr.get_ordering(children_sort_options));
node.state = node.expr.get_ordering(&node.children_states);
} else if let Some(column) = node.expr.as_any().downcast_ref::<Column>() {
// We have a Column, which is one of the two possible leaf node types:
node.state = get_indices_of_matching_sort_exprs_with_order_eq(
Expand All @@ -268,10 +270,11 @@ pub fn update_ordering(
descending: sort_options[0].descending,
nulls_first: sort_options[0].nulls_first,
})
});
})
.unwrap_or(SortProperties::Unordered);
} else {
// We have a Literal, which is the other possible leaf node type:
node.state = Some(node.expr.get_ordering(&[]));
node.state = node.expr.get_ordering(&[]);
}
Ok(Transformed::Yes(node))
}
90 changes: 89 additions & 1 deletion datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ pub fn find_orderings_of_exprs(
&input_ordering_equal_properties,
)
})?;
if let Some(SortProperties::Ordered(sort_options)) = transformed.state {
if let SortProperties::Ordered(sort_options) = transformed.state {
orderings.push(Some(PhysicalSortExpr {
expr: Arc::new(Column::new(name, index)),
options: sort_options,
Expand Down Expand Up @@ -1836,4 +1836,92 @@ mod tests {

Ok(())
}

#[test]
fn test_find_orderings_of_exprs() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);

let mut eq = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
// b=a (e.g they are aliases)
eq.add_equal_conditions((&Column::new("b", 1), &Column::new("a", 0)));
let mut oeq = OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
// [b ASC], [d ASC]
oeq.add_equal_conditions((
&vec![PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}],
&vec![PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
}],
));

let orderings = find_orderings_of_exprs(
&[
// d + b
(
Arc::new(BinaryExpr::new(
col_d.clone(),
Operator::Plus,
col_b.clone(),
)),
"d+b".to_string(),
),
// b as b_new
(col_b.clone(), "b_new".to_string()),
// a as a_new
(col_a.clone(), "a_new".to_string()),
// a + c
(
Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_c.clone(),
)),
"a+c".to_string(),
),
],
Some(&[PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}]),
eq,
oeq,
)?;

assert_eq!(
vec![
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("d+b", 0)),
options: option_asc,
}),
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("b_new", 1)),
options: option_asc,
}),
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("a_new", 2)),
options: option_asc,
}),
None,
],
orderings
);

Ok(())
}
}

0 comments on commit 0d4dc36

Please sign in to comment.