Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of Ordering and Prunability Traversals and States #7985

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,45 +155,44 @@ impl Neg for SortProperties {
#[derive(Debug)]
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: Option<SortProperties>,
pub children_states: Option<Vec<SortProperties>>,
pub state: SortProperties,
pub children_states: Vec<SortProperties>,
}

impl ExprOrdering {
/// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states
/// for `expr` and its children.
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let size = expr.children().len();
Self {
expr,
state: None,
children_states: None,
state: SortProperties::Unordered,
children_states: vec![SortProperties::Unordered; size],
}
}

pub fn children(&self) -> Vec<ExprOrdering> {
/// Updates this [`ExprOrdering`]'s children states with the given states.
pub fn with_new_children(mut self, children_states: Vec<SortProperties>) -> Self {
self.children_states = children_states;
self
}

/// Creates new [`ExprOrdering`] objects for each child of the expression.
pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
self.expr
.children()
.into_iter()
.map(ExprOrdering::new)
.collect()
}

pub fn new_with_children(
children_states: Vec<SortProperties>,
parent_expr: Arc<dyn PhysicalExpr>,
) -> Self {
Self {
expr: parent_expr,
state: None,
children_states: Some(children_states),
}
}
}

impl TreeNode for ExprOrdering {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
for child in self.children_expr_orderings() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
Expand All @@ -207,17 +206,20 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
if self.children_states.is_empty() {
Ok(self)
} else {
Ok(ExprOrdering::new_with_children(
children
let child_expr_orderings = self.children_expr_orderings();
// After mapping over the children, the function `F` applies to the
// current object and updates its state.
Ok(self.with_new_children(
child_expr_orderings
.into_iter()
// Update children states after this transformation:
.map(transform)
.map_ok(|c| c.state.unwrap_or(SortProperties::Unordered))
// Extract the state (i.e. sort properties) information:
.map_ok(|c| c.state)
.collect::<Result<Vec<_>>>()?,
self.expr,
))
}
}
Expand Down Expand Up @@ -248,13 +250,13 @@ pub fn update_ordering(
// a BinaryExpr like a + b), and there is an ordering equivalence of
// it (let's say like c + d), we actually can find it at this step.
if sort_expr.expr.eq(&node.expr) {
node.state = Some(SortProperties::Ordered(sort_expr.options));
node.state = SortProperties::Ordered(sort_expr.options);
return Ok(Transformed::Yes(node));
}

if let Some(children_sort_options) = &node.children_states {
if !node.expr.children().is_empty() {
// We have an intermediate (non-leaf) node, account for its children:
node.state = Some(node.expr.get_ordering(children_sort_options));
node.state = node.expr.get_ordering(&node.children_states);
} else if let Some(column) = node.expr.as_any().downcast_ref::<Column>() {
// We have a Column, which is one of the two possible leaf node types:
node.state = get_indices_of_matching_sort_exprs_with_order_eq(
Expand All @@ -268,10 +270,11 @@ pub fn update_ordering(
descending: sort_options[0].descending,
nulls_first: sort_options[0].nulls_first,
})
});
})
.unwrap_or(SortProperties::Unordered);
} else {
// We have a Literal, which is the other possible leaf node type:
node.state = Some(node.expr.get_ordering(&[]));
node.state = node.expr.get_ordering(&[]);
}
Ok(Transformed::Yes(node))
}
90 changes: 89 additions & 1 deletion datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ pub fn find_orderings_of_exprs(
&input_ordering_equal_properties,
)
})?;
if let Some(SortProperties::Ordered(sort_options)) = transformed.state {
if let SortProperties::Ordered(sort_options) = transformed.state {
orderings.push(Some(PhysicalSortExpr {
expr: Arc::new(Column::new(name, index)),
options: sort_options,
Expand Down Expand Up @@ -1836,4 +1836,92 @@ mod tests {

Ok(())
}

#[test]
fn test_find_orderings_of_exprs() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Int32, true),
Field::new("d", DataType::Int32, true),
]);

let mut eq = EquivalenceProperties::new(Arc::new(schema.clone()));
let col_a = &col("a", &schema)?;
let col_b = &col("b", &schema)?;
let col_c = &col("c", &schema)?;
let col_d = &col("d", &schema)?;
let option_asc = SortOptions {
descending: false,
nulls_first: false,
};
// b=a (e.g they are aliases)
eq.add_equal_conditions((&Column::new("b", 1), &Column::new("a", 0)));
let mut oeq = OrderingEquivalenceProperties::new(Arc::new(schema.clone()));
// [b ASC], [d ASC]
oeq.add_equal_conditions((
&vec![PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}],
&vec![PhysicalSortExpr {
expr: col_d.clone(),
options: option_asc,
}],
));

let orderings = find_orderings_of_exprs(
&[
// d + b
(
Arc::new(BinaryExpr::new(
col_d.clone(),
Operator::Plus,
col_b.clone(),
)),
"d+b".to_string(),
),
// b as b_new
(col_b.clone(), "b_new".to_string()),
// a as a_new
(col_a.clone(), "a_new".to_string()),
// a + c
(
Arc::new(BinaryExpr::new(
col_a.clone(),
Operator::Plus,
col_c.clone(),
)),
"a+c".to_string(),
),
],
Some(&[PhysicalSortExpr {
expr: col_b.clone(),
options: option_asc,
}]),
eq,
oeq,
)?;

assert_eq!(
vec![
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("d+b", 0)),
options: option_asc,
}),
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("b_new", 1)),
options: option_asc,
}),
Some(PhysicalSortExpr {
expr: Arc::new(Column::new("a_new", 2)),
options: option_asc,
}),
None,
],
orderings
);

Ok(())
}
}