Skip to content

Commit

Permalink
Implement qualified expression alias and extend test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Nov 10, 2023
1 parent 8467736 commit 8ebdc13
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 36 deletions.
35 changes: 31 additions & 4 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::Operator;
use crate::{aggregate_function, ExprSchemable};
use arrow::datatypes::DataType;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, DFSchema};
use datafusion_common::{internal_err, DFSchema, OwnedTableReference};
use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue};
use std::collections::HashSet;
use std::fmt;
Expand Down Expand Up @@ -172,7 +172,7 @@ pub enum Expr {
/// plan into physical plan.
Wildcard,
/// Represents a reference to all available fields in a specific schema.
///
///
/// This expr has to be resolved to a list of columns before translating logical
/// plan into physical plan.
QualifiedWildcard { qualifier: String },
Expand All @@ -191,13 +191,20 @@ pub enum Expr {
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Alias {
pub expr: Box<Expr>,
pub relation: Option<OwnedTableReference>,
pub name: String,
}

impl Alias {
pub fn new(expr: Expr, name: impl Into<String>) -> Self {
/// Create an alias with an optional schema/field qualifier.
pub fn new(
expr: Expr,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Self {
Self {
expr: Box::new(expr),
relation: relation.map(|r| r.into()),
name: name.into(),
}
}
Expand Down Expand Up @@ -849,7 +856,27 @@ impl Expr {
asc,
nulls_first,
}) => Expr::Sort(Sort::new(Box::new(expr.alias(name)), asc, nulls_first)),
_ => Expr::Alias(Alias::new(self, name.into())),
_ => Expr::Alias(Alias::new(self, None::<&str>, name.into())),
}
}

/// Return `self AS name` alias expression with a specific qualifier
pub fn alias_qualified(
self,
relation: Option<impl Into<OwnedTableReference>>,
name: impl Into<String>,
) -> Expr {
match self {
Expr::Sort(Sort {
expr,
asc,
nulls_first,
}) => Expr::Sort(Sort::new(
Box::new(expr.alias_qualified(relation, name)),
asc,
nulls_first,
)),
_ => Expr::Alias(Alias::new(self, relation, name.into())),
}
}

Expand Down
7 changes: 7 additions & 0 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ impl ExprSchemable for Expr {
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
relation.clone(),
name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)
.with_metadata(self.metadata(input_schema)?)),
_ => Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
Expand Down
17 changes: 4 additions & 13 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2219,19 +2219,10 @@ impl DistinctOn {

let on_expr = normalize_cols(on_expr, input.as_ref())?;

// Create fields with any qualifier stuffed in the name itself
let fields = exprlist_to_fields(&select_expr, &input)?
.iter()
.map(|f| {
DFField::new_unqualified(
&f.qualified_name(),
f.data_type().clone(),
f.is_nullable(),
)
})
.collect();
let schema =
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?;
let schema = DFSchema::new_with_metadata(
exprlist_to_fields(&select_expr, &input)?,
input.schema().metadata().clone(),
)?;

let mut distinct_on = DistinctOn {
on_expr,
Expand Down
8 changes: 5 additions & 3 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,11 @@ impl TreeNode for Expr {
let mut transform = transform;

Ok(match self {
Expr::Alias(Alias { expr, name, .. }) => {
Expr::Alias(Alias::new(transform(*expr)?, name))
}
Expr::Alias(Alias {
expr,
relation,
name,
}) => Expr::Alias(Alias::new(transform(*expr)?, relation, name)),
Expr::Column(_) => self,
Expr::OuterReferenceColumn(_, _) => self,
Expr::Exists { .. } => self,
Expand Down
8 changes: 5 additions & 3 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,11 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
match e {
Expr::Column(_) => e,
Expr::OuterReferenceColumn(_, _) => e,
Expr::Alias(Alias { expr, name, .. }) => {
columnize_expr(*expr, input_schema).alias(name)
}
Expr::Alias(Alias {
expr,
relation,
name,
}) => columnize_expr(*expr, input_schema).alias_qualified(relation, name),
Expr::Cast(Cast { expr, data_type }) => Expr::Cast(Cast {
expr: Box::new(columnize_expr(*expr, input_schema)),
data_type,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl Optimizer {
/// Returns an error if plans have different schemas.
///
/// It ignores metadata and nullability.
fn assert_schema_is_the_same(
pub(crate) fn assert_schema_is_the_same(
rule_name: &str,
prev_plan: &LogicalPlan,
new_plan: &LogicalPlan,
Expand Down
34 changes: 30 additions & 4 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
on_expr,
sort_expr,
input,
..
schema,
})) => {
// Construct the aggregation expression to be used to fetch the selected expressions.
let aggr_expr = select_expr
Expand Down Expand Up @@ -124,9 +124,12 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
.fields()
.iter()
.skip(on_expr.len())
.zip(select_expr.iter())
.map(|(f, sel)| {
Ok(col(f.qualified_column()).alias(sel.display_name()?))
.zip(schema.fields().iter())
.map(|(new_field, old_field)| {
Ok(col(new_field.qualified_column()).alias_qualified(
old_field.qualifier().cloned(),
old_field.name(),
))
})
.collect::<Result<Vec<Expr>>>()?;

Expand Down Expand Up @@ -174,4 +177,27 @@ mod tests {
expected,
)
}

#[test]
fn replace_distinct_on() -> datafusion_common::Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.distinct_on(
vec![col("a")],
vec![col("b")],
Some(vec![col("a").sort(false, true), col("c").sort(true, false)]),
)?
.build()?;

let expected = "Projection: FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST] AS b\
\n Sort: test.a DESC NULLS FIRST\
\n Aggregate: groupBy=[[test.a]], aggr=[[FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\
\n TableScan: test";

assert_optimized_plan_eq(
Arc::new(ReplaceDistinctWithAggregate::new()),
&plan,
expected,
)
}
}
7 changes: 5 additions & 2 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::analyzer::{Analyzer, AnalyzerRule};
use crate::optimizer::Optimizer;
use crate::optimizer::{assert_schema_is_the_same, Optimizer};
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
Expand Down Expand Up @@ -155,14 +155,17 @@ pub fn assert_optimized_plan_eq(
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let optimizer = Optimizer::with_rules(vec![rule]);
let optimizer = Optimizer::with_rules(vec![rule.clone()]);
let optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&OptimizerContext::new(),
)?
.unwrap_or_else(|| plan.clone());

// Ensure schemas always match after an optimization
assert_schema_is_the_same(rule.name(), plan, &optimized_plan)?;
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(formatted_plan, expected);

Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ message Not {

message AliasNode {
LogicalExprNode expr = 1;
string alias = 2;
repeated OwnedTableReference relation = 2;
string alias = 3;
}

message BinaryExprNode {
Expand Down
17 changes: 17 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,11 @@ pub fn parse_expr(
}
ExprType::Alias(alias) => Ok(Expr::Alias(Alias::new(
parse_required_expr(alias.expr.as_deref(), registry, "expr")?,
alias
.relation
.first()
.map(|r| OwnedTableReference::try_from(r.clone()))
.transpose()?,
alias.alias.clone(),
))),
ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr(
Expand Down
10 changes: 9 additions & 1 deletion datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,17 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::Column(c) => Self {
expr_type: Some(ExprType::Column(c.into())),
},
Expr::Alias(Alias { expr, name, .. }) => {
Expr::Alias(Alias {
expr,
relation,
name,
}) => {
let alias = Box::new(protobuf::AliasNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
relation: relation
.to_owned()
.map(|r| vec![r.into()])
.unwrap_or(vec![]),
alias: name.to_owned(),
});
Self {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/proto/tests/cases/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ fn exact_roundtrip_linearized_binary_expr() {
}
}

#[test]
fn roundtrip_qualified_alias() {
let qual_alias = col("c1").alias_qualified(Some("my_table"), "my_column");
assert_eq!(qual_alias, roundtrip_expr(&qual_alias));
}

#[test]
fn roundtrip_deeply_nested_binary_expr() {
// We need more stack space so this doesn't overflow in dev builds
Expand Down
3 changes: 2 additions & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.filter(|select_expr| match select_expr {
Expr::AggregateFunction(_) | Expr::AggregateUDF(_) => false,
Expr::Alias(Alias { expr, name: _ }) => !matches!(
Expr::Alias(Alias { expr, name: _, .. }) => !matches!(
**expr,
Expr::AggregateFunction(_) | Expr::AggregateUDF(_)
),
Expand Down Expand Up @@ -487,6 +487,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.clone();
*expr = Expr::Alias(Alias {
expr: Box::new(new_expr),
relation: None,
name: name.clone(),
});
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ query TT
EXPLAIN SELECT DISTINCT ON (c1) c3, c2 FROM aggregate_test_100 ORDER BY c1, c3;
----
logical_plan
Projection: FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] AS aggregate_test_100.c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] AS aggregate_test_100.c2
Projection: FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] AS c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] AS c2
--Sort: aggregate_test_100.c1 ASC NULLS LAST
----Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]]
------TableScan: aggregate_test_100 projection=[c1, c2, c3]
physical_plan
ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as aggregate_test_100.c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as aggregate_test_100.c2]
ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
----SortExec: expr=[c1@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
Expand Down

0 comments on commit 8ebdc13

Please sign in to comment.