Skip to content

Commit

Permalink
Use Expr::display_name to create physical_name (#257)
Browse files Browse the repository at this point in the history
Less opportunities for bugs
  • Loading branch information
joroKr21 authored Aug 21, 2024
1 parent 7f3eac7 commit 11ed341
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 396 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ path = "src/lib.rs"
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
Expand Down
267 changes: 8 additions & 259 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;

use crate::datasource::file_format::arrow::ArrowFormat;
Expand Down Expand Up @@ -82,11 +81,10 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, AggregateFunction, AggregateFunctionDefinition, Alias, Between, BinaryExpr,
Cast, GroupingSet, InList, Like, TryCast, WindowFunction,
self, AggregateFunction, AggregateFunctionDefinition, Alias, GroupingSet,
WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
Expand All @@ -105,260 +103,11 @@ use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;
use url::Url;

fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(Alias { name, .. }) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{e} ");
}
for (w, t) in &case.when_then_expr {
let _ = write!(name, "WHEN {w} THEN {t} ");
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {e} ");
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter: _,
order_by,
null_treatment: _,
}) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => {
not_impl_err!("EXISTS is not yet supported in the physical plan")
}
Expr::InSubquery(_) => {
not_impl_err!("IN subquery is not yet supported in the physical plan")
}
Expr::ScalarSubquery(_) => {
not_impl_err!("Scalar subqueries are not yet supported in the physical plan")
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT {op_name} {pattern}{escape}"))
} else {
Ok(format!("{expr} {op_name} {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expr::OuterReferenceColumn(_, _) => {
internal_err!("Create physical name does not support OuterReferenceColumn")
}
fn physical_name(expr: &Expr) -> Result<String> {
if let Expr::Column(col) = expr {
Ok(col.name.clone())
} else {
expr.display_name()
}
}

Expand Down Expand Up @@ -1709,7 +1458,7 @@ fn get_null_physical_expr_pair(
) -> Result<(Arc<dyn PhysicalExpr>, String)> {
let physical_expr =
create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
let physical_name = physical_name(&expr.clone())?;
let physical_name = physical_name(expr)?;

let data_type = physical_expr.data_type(input_schema)?;
let null_value: ScalarValue = (&data_type).try_into()?;
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b"), lit(0.5));

let expected = [
"+---------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b,Float64(0.5)) |",
"+---------------------------------------------+",
"| 10 |",
"+---------------------------------------------+",
"+----------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b, Float64(0.5)) |",
"+----------------------------------------------+",
"| 10 |",
"+----------------------------------------------+",
];

let df = create_test_table().await?;
Expand All @@ -382,11 +382,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b"), alias_expr);
let df = create_test_table().await?;
let expected = [
"+--------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b,arg_2) |",
"+--------------------------------------+",
"| 10 |",
"+--------------------------------------+",
"+---------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b, arg_2) |",
"+---------------------------------------+",
"| 10 |",
"+---------------------------------------+",
];
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn test_udaf_returning_struct() {
let sql = "SELECT first(value, time) from t";
let expected = [
"+------------------------------------------------+",
"| first(t.value,t.time) |",
"| first(t.value, t.time) |",
"+------------------------------------------------+",
"| {value: 2.0, time: 1970-01-01T00:00:00.000002} |",
"+------------------------------------------------+",
Expand Down
Loading

0 comments on commit 11ed341

Please sign in to comment.