Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Expr::display_name to create physical_name #257

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ path = "src/lib.rs"
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []

[dependencies]
ahash = { workspace = true }
Expand Down
267 changes: 8 additions & 259 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;

use crate::datasource::file_format::arrow::ArrowFormat;
Expand Down Expand Up @@ -82,11 +81,10 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, AggregateFunction, AggregateFunctionDefinition, Alias, Between, BinaryExpr,
Cast, GroupingSet, InList, Like, TryCast, WindowFunction,
self, AggregateFunction, AggregateFunctionDefinition, Alias, GroupingSet,
WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
Expand All @@ -105,260 +103,11 @@ use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;
use url::Url;

fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(Alias { name, .. }) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{e} ");
}
for (w, t) in &case.when_then_expr {
let _ = write!(name, "WHEN {w} THEN {t} ");
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {e} ");
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func_def,
distinct,
args,
filter: _,
order_by,
null_treatment: _,
}) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => {
not_impl_err!("EXISTS is not yet supported in the physical plan")
}
Expr::InSubquery(_) => {
not_impl_err!("IN subquery is not yet supported in the physical plan")
}
Expr::ScalarSubquery(_) => {
not_impl_err!("Scalar subqueries are not yet supported in the physical plan")
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT {op_name} {pattern}{escape}"))
} else {
Ok(format!("{expr} {op_name} {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expr::OuterReferenceColumn(_, _) => {
internal_err!("Create physical name does not support OuterReferenceColumn")
}
fn physical_name(expr: &Expr) -> Result<String> {
if let Expr::Column(col) = expr {
Ok(col.name.clone())
} else {
expr.display_name()
}
}

Expand Down Expand Up @@ -1709,7 +1458,7 @@ fn get_null_physical_expr_pair(
) -> Result<(Arc<dyn PhysicalExpr>, String)> {
let physical_expr =
create_physical_expr(expr, input_dfschema, session_state.execution_props())?;
let physical_name = physical_name(&expr.clone())?;
let physical_name = physical_name(expr)?;

let data_type = physical_expr.data_type(input_schema)?;
let null_value: ScalarValue = (&data_type).try_into()?;
Expand Down
20 changes: 10 additions & 10 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b"), lit(0.5));

let expected = [
"+---------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b,Float64(0.5)) |",
"+---------------------------------------------+",
"| 10 |",
"+---------------------------------------------+",
"+----------------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b, Float64(0.5)) |",
"+----------------------------------------------+",
"| 10 |",
"+----------------------------------------------+",
];

let df = create_test_table().await?;
Expand All @@ -382,11 +382,11 @@ async fn test_fn_approx_percentile_cont() -> Result<()> {
let expr = approx_percentile_cont(col("b"), alias_expr);
let df = create_test_table().await?;
let expected = [
"+--------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b,arg_2) |",
"+--------------------------------------+",
"| 10 |",
"+--------------------------------------+",
"+---------------------------------------+",
"| APPROX_PERCENTILE_CONT(test.b, arg_2) |",
"+---------------------------------------+",
"| 10 |",
"+---------------------------------------+",
];
let batches = df.aggregate(vec![], vec![expr]).unwrap().collect().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn test_udaf_returning_struct() {
let sql = "SELECT first(value, time) from t";
let expected = [
"+------------------------------------------------+",
"| first(t.value,t.time) |",
"| first(t.value, t.time) |",
"+------------------------------------------------+",
"| {value: 2.0, time: 1970-01-01T00:00:00.000002} |",
"+------------------------------------------------+",
Expand Down
Loading
Loading