From ba92a84ee32725e2179d467319ccdb4e5f720d3d Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Tue, 16 Jul 2024 10:31:20 +0530 Subject: [PATCH 01/18] move get_field to expr planner --- datafusion/expr/src/planner.rs | 7 ++++++ datafusion/functions/src/core/planner.rs | 6 +++++ datafusion/sql/src/expr/identifier.rs | 30 ++++++++++++++---------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 2f13923b1f10..90d1cec1b26f 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -168,6 +168,13 @@ pub trait ExprPlanner: Send + Sync { fn plan_overlay(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } + + /// Plans a multi-part identifier, e.g. `table_alias.column` or `schema.table.col` + /// + /// Returns origin expression arguments if not possible + fn plan_get_field(&self, args: Vec) -> Result>> { + Ok(PlannerResult::Original(args)) + } } /// An operator with two arguments to plan diff --git a/datafusion/functions/src/core/planner.rs b/datafusion/functions/src/core/planner.rs index 63eaa9874c2b..bcdddd0fbfa1 100644 --- a/datafusion/functions/src/core/planner.rs +++ b/datafusion/functions/src/core/planner.rs @@ -62,4 +62,10 @@ impl ExprPlanner for CoreFunctionPlanner { ScalarFunction::new_udf(crate::string::overlay(), args), ))) } + + fn plan_get_field(&self, args: Vec) -> Result>> { + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf(crate::core::get_field(), args), + ))) + } } diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 39736b1fbba5..a13aa69abfb7 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -15,14 +15,17 @@ // specific language governing permissions and limitations // under the License. -use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use arrow_schema::Field; +use sqlparser::ast::{Expr as SQLExpr, Ident}; + use datafusion_common::{ - internal_err, not_impl_err, plan_datafusion_err, Column, DFSchema, DataFusionError, + Column, DataFusionError, DFSchema, internal_err, not_impl_err, plan_datafusion_err, Result, ScalarValue, TableReference, }; -use datafusion_expr::{expr::ScalarFunction, lit, Case, Expr}; -use sqlparser::ast::{Expr as SQLExpr, Ident}; +use datafusion_expr::{Case, Expr, lit}; +use datafusion_expr::planner::PlannerResult; + +use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub(super) fn sql_identifier_to_expr( @@ -135,16 +138,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let nested_name = nested_names[0].to_string(); let col = Expr::Column(Column::from((qualifier, field))); - if let Some(udf) = - self.context_provider.get_function_meta("get_field") - { - Ok(Expr::ScalarFunction(ScalarFunction::new_udf( - udf, - vec![col, lit(ScalarValue::from(nested_name))], - ))) - } else { - internal_err!("get_field not found") + let mut get_field_args = + vec![col, lit(ScalarValue::from(nested_name))]; + for planner in self.planners.iter() { + match planner.plan_get_field(get_field_args)? { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(args) => get_field_args = args, + } } + not_impl_err!( + "GetField not supported by ExprPlanner: {get_field_args:?}" + ) } // found matching field with no spare identifier(s) Some((field, qualifier, _nested_names)) => { From 44118b9cd9ee8921f8f7e93ef80b5fc7126d25b7 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Tue, 16 Jul 2024 10:39:25 +0530 Subject: [PATCH 02/18] formatting --- datafusion/sql/src/expr/identifier.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index a13aa69abfb7..061340d204c6 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -19,11 +19,11 @@ use arrow_schema::Field; use sqlparser::ast::{Expr as SQLExpr, Ident}; use datafusion_common::{ - Column, DataFusionError, DFSchema, internal_err, not_impl_err, plan_datafusion_err, + internal_err, not_impl_err, plan_datafusion_err, Column, DFSchema, DataFusionError, Result, ScalarValue, TableReference, }; -use datafusion_expr::{Case, Expr, lit}; use datafusion_expr::planner::PlannerResult; +use datafusion_expr::{lit, Case, Expr}; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; From 404347e5b95e199a62da377d5e67f34e8c157cb3 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Tue, 16 Jul 2024 10:41:10 +0530 Subject: [PATCH 03/18] formatting --- datafusion/expr/src/planner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 90d1cec1b26f..dd79f14c718a 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -169,8 +169,8 @@ pub trait ExprPlanner: Send + Sync { Ok(PlannerResult::Original(args)) } - /// Plans a multi-part identifier, e.g. `table_alias.column` or `schema.table.col` - /// + /// Plans a field expression. + /// FIXME: Add documentation /// Returns origin expression arguments if not possible fn plan_get_field(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) From 1adc9a448aeff5e7e65732273cab674fbe26cf03 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Tue, 16 Jul 2024 12:00:41 +0530 Subject: [PATCH 04/18] documentation --- datafusion/expr/src/planner.rs | 5 +++-- datafusion/functions/src/core/mod.rs | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index dd79f14c718a..f27bca574b02 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -169,8 +169,9 @@ pub trait ExprPlanner: Send + Sync { Ok(PlannerResult::Original(args)) } - /// Plans a field expression. - /// FIXME: Add documentation + /// Plans expression to get an index field. Indexed field is only valid for `List`, `Struct`, `Map` or `Null`. + /// eg `expr["name"]` + /// /// Returns origin expression arguments if not possible fn plan_get_field(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index 31bce04beec1..45688dcb3811 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -105,7 +105,6 @@ pub fn functions() -> Vec> { nvl2(), arrow_typeof(), named_struct(), - get_field(), coalesce(), make_map(), map(), From 5f4ac70d443b0ffde3840e076c65907b1d5b1149 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Tue, 16 Jul 2024 19:41:52 +0530 Subject: [PATCH 05/18] refactor --- datafusion-cli/Cargo.lock | 1 + datafusion/expr/src/planner.rs | 17 ++++++--- datafusion/functions/src/core/planner.rs | 37 ++++++++++++++++--- datafusion/optimizer/Cargo.toml | 1 + .../optimizer/tests/optimizer_integration.rs | 4 +- datafusion/sql/src/expr/identifier.rs | 34 +++++------------ datafusion/sql/tests/cases/plan_to_sql.rs | 11 ++++-- datafusion/sql/tests/sql_integration.rs | 5 ++- 8 files changed, 70 insertions(+), 40 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 7da9cc427c37..7ebfc892d162 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1342,6 +1342,7 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", + "datafusion-functions", "datafusion-physical-expr", "hashbrown 0.14.5", "indexmap 2.2.6", diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index f27bca574b02..120b7191cf35 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::{DataType, Field, SchemaRef}; use datafusion_common::{ config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema, Result, TableReference, @@ -169,13 +169,20 @@ pub trait ExprPlanner: Send + Sync { Ok(PlannerResult::Original(args)) } - /// Plans expression to get an index field. Indexed field is only valid for `List`, `Struct`, `Map` or `Null`. - /// eg `expr["name"]` - /// - /// Returns origin expression arguments if not possible fn plan_get_field(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } + /// Plans compound identifier + /// + /// Returns origin expression arguments if not possible + fn plan_compound_identifier( + &self, + _filed: &Field, + _qualifier: Option<&TableReference>, + _nested_names: &[String], + ) -> Result>> { + Ok(PlannerResult::Original(vec![])) + } } /// An operator with two arguments to plan diff --git a/datafusion/functions/src/core/planner.rs b/datafusion/functions/src/core/planner.rs index bcdddd0fbfa1..b94da9b53892 100644 --- a/datafusion/functions/src/core/planner.rs +++ b/datafusion/functions/src/core/planner.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -use datafusion_common::DFSchema; +use arrow::datatypes::Field; use datafusion_common::Result; +use datafusion_common::{not_impl_err, Column, DFSchema, ScalarValue, TableReference}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawDictionaryExpr}; -use datafusion_expr::Expr; +use datafusion_expr::{lit, Expr}; use super::named_struct; @@ -63,9 +64,33 @@ impl ExprPlanner for CoreFunctionPlanner { ))) } - fn plan_get_field(&self, args: Vec) -> Result>> { - Ok(PlannerResult::Planned(Expr::ScalarFunction( - ScalarFunction::new_udf(crate::core::get_field(), args), - ))) + fn plan_compound_identifier( + &self, + field: &Field, + qualifier: Option<&TableReference>, + nested_names: &[String], + ) -> Result>> { + // found matching field with no spare identifier(s) + if nested_names.is_empty() { + Ok(PlannerResult::Planned(Expr::Column(Column::from(( + qualifier, field, + ))))) + } else { + // found matching field with spare identifier(s) for nested field(s) in structure + // TODO: remove when can support multiple nested identifiers + if nested_names.len() > 1 { + return not_impl_err!( + "Nested identifiers not yet supported for column {}", + Column::from((qualifier, field)).quoted_flat_name() + ); + } + let nested_name = nested_names[0].to_string(); + + let col = Expr::Column(Column::from((qualifier, field))); + let get_field_args = vec![col, lit(ScalarValue::from(nested_name))]; + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf(crate::core::get_field(), get_field_args), + ))) + } } } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 1a9e9630c076..54d8c472f13f 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -45,6 +45,7 @@ async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index c0863839dba1..3b4e4c718313 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -25,6 +25,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, Result}; use datafusion_expr::test::function_stub::sum_udaf; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF}; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_optimizer::analyzer::Analyzer; @@ -344,7 +345,8 @@ fn test_sql(sql: &str) -> Result { .with_udaf(sum_udaf()) .with_udaf(count_udaf()) .with_udaf(avg_udaf()); - let sql_to_rel = SqlToRel::new(&context_provider); + let sql_to_rel = SqlToRel::new(&context_provider) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); let config = OptimizerContext::new().with_skip_failing_rules(false); diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 061340d204c6..e4c357f4e252 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -20,10 +20,10 @@ use sqlparser::ast::{Expr as SQLExpr, Ident}; use datafusion_common::{ internal_err, not_impl_err, plan_datafusion_err, Column, DFSchema, DataFusionError, - Result, ScalarValue, TableReference, + Result, TableReference, }; use datafusion_expr::planner::PlannerResult; -use datafusion_expr::{lit, Case, Expr}; +use datafusion_expr::{Case, Expr}; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; @@ -127,32 +127,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let search_result = search_dfschema(&ids, schema); match search_result { // found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) if !nested_names.is_empty() => { - // TODO: remove when can support multiple nested identifiers - if nested_names.len() > 1 { - return not_impl_err!( - "Nested identifiers not yet supported for column {}", - Column::from((qualifier, field)).quoted_flat_name() - ); - } - let nested_name = nested_names[0].to_string(); - - let col = Expr::Column(Column::from((qualifier, field))); - let mut get_field_args = - vec![col, lit(ScalarValue::from(nested_name))]; + Some((field, qualifier, nested_names)) => { for planner in self.planners.iter() { - match planner.plan_get_field(get_field_args)? { + match planner.plan_compound_identifier( + field, + qualifier, + nested_names, + )? { PlannerResult::Planned(expr) => return Ok(expr), - PlannerResult::Original(args) => get_field_args = args, + PlannerResult::Original(_args) => {} } } - not_impl_err!( - "GetField not supported by ExprPlanner: {get_field_args:?}" - ) - } - // found matching field with no spare identifier(s) - Some((field, qualifier, _nested_names)) => { - Ok(Expr::Column(Column::from((qualifier, field)))) + not_impl_err!("GetField not supported by ExprPlanner") } None => { // return default where use all identifiers to not have a nested field diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 91295b2e8aae..22043e09e745 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; use std::vec; use arrow_schema::*; @@ -28,6 +29,7 @@ use datafusion_sql::unparser::dialect::{ }; use datafusion_sql::unparser::{expr_to_sql, plan_to_sql, Unparser}; +use datafusion_functions::core::planner::CoreFunctionPlanner; use sqlparser::dialect::{Dialect, GenericDialect, MySqlDialect}; use sqlparser::parser::Parser; @@ -156,7 +158,8 @@ fn roundtrip_statement() -> Result<()> { let context = MockContextProvider::default() .with_udaf(sum_udaf()) .with_udaf(count_udaf()); - let sql_to_rel = SqlToRel::new(&context); + let sql_to_rel = SqlToRel::new(&context) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); let roundtrip_statement = plan_to_sql(&plan)?; @@ -185,7 +188,8 @@ fn roundtrip_crossjoin() -> Result<()> { .parse_statement()?; let context = MockContextProvider::default(); - let sql_to_rel = SqlToRel::new(&context); + let sql_to_rel = SqlToRel::new(&context) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); let roundtrip_statement = plan_to_sql(&plan)?; @@ -248,7 +252,8 @@ fn roundtrip_statement_with_dialect() -> Result<()> { .parse_statement()?; let context = MockContextProvider::default(); - let sql_to_rel = SqlToRel::new(&context); + let sql_to_rel = SqlToRel::new(&context) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel .sql_statement_to_plan(statement) .unwrap_or_else(|e| panic!("Failed to parse sql: {}\n{e}", query.sql)); diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index aca0d040bb8d..cfa50d66f74a 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -18,6 +18,7 @@ use std::any::Any; #[cfg(test)] use std::collections::HashMap; +use std::sync::Arc; use std::vec; use arrow_schema::TimeUnit::Nanosecond; @@ -37,6 +38,7 @@ use datafusion_sql::{ planner::{ParserOptions, SqlToRel}, }; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::{ approx_median::approx_median_udaf, count::count_udaf, }; @@ -2696,7 +2698,8 @@ fn logical_plan_with_dialect_and_options( .with_udaf(avg_udaf()) .with_udaf(grouping_udaf()); - let planner = SqlToRel::new_with_options(&context, options); + let planner = SqlToRel::new_with_options(&context, options) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let result = DFParser::parse_sql_with_dialect(sql, dialect); let mut ast = result?; planner.statement_to_plan(ast.pop_front().unwrap()) From 38debfc5e941b090efe801252c149b571dcb6c2f Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Wed, 17 Jul 2024 22:41:33 +0530 Subject: [PATCH 06/18] documentation & fix's --- datafusion/expr/src/planner.rs | 7 +++++-- datafusion/sql/examples/sql.rs | 4 +++- datafusion/sql/src/expr/identifier.rs | 4 +++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index 120b7191cf35..a69908af1b6a 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -172,9 +172,12 @@ pub trait ExprPlanner: Send + Sync { fn plan_get_field(&self, args: Vec) -> Result>> { Ok(PlannerResult::Original(args)) } - /// Plans compound identifier + /// Plans compound identifier eg `db.schema.table`. /// - /// Returns origin expression arguments if not possible + /// Note: + /// Currently compound identifier for outer query schema is not supported. + /// + /// Returns empty expression arguments if not possible fn plan_compound_identifier( &self, _filed: &Field, diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 1b92a7e116b1..141a56716c81 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -22,6 +22,7 @@ use datafusion_expr::WindowUDF; use datafusion_expr::{ logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource, }; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_sql::{ @@ -54,7 +55,8 @@ fn main() { let context_provider = MyContextProvider::new() .with_udaf(sum_udaf()) .with_udaf(count_udaf()); - let sql_to_rel = SqlToRel::new(&context_provider); + let sql_to_rel = SqlToRel::new(&context_provider) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // show the plan diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index e4c357f4e252..038e974ad613 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -138,7 +138,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { PlannerResult::Original(_args) => {} } } - not_impl_err!("GetField not supported by ExprPlanner") + not_impl_err!( + "Compound identifiers not supported by ExprPlanner: {ids:?}" + ) } None => { // return default where use all identifiers to not have a nested field From 6623219ccf547abdf7c5774d0b5532bcab165039 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 18 Jul 2024 10:01:52 +0530 Subject: [PATCH 07/18] move optimizer tests to core --- .../core/tests/optimizer_integration.rs | 137 +++++++++++++++++- .../optimizer/tests/optimizer_integration.rs | 137 +----------------- 2 files changed, 137 insertions(+), 137 deletions(-) diff --git a/datafusion/core/tests/optimizer_integration.rs b/datafusion/core/tests/optimizer_integration.rs index 39f745cd3309..5c6d0c673aa4 100644 --- a/datafusion/core/tests/optimizer_integration.rs +++ b/datafusion/core/tests/optimizer_integration.rs @@ -44,6 +44,7 @@ use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::TableReference; use chrono::DateTime; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions::datetime; #[cfg(test)] @@ -115,6 +116,139 @@ fn concat_ws_literals() -> Result<()> { Ok(()) } +#[test] +fn anti_join_with_join_filter() -> Result<()> { + // regression test for https://github.com/apache/datafusion/issues/2888 + let sql = "SELECT col_utf8 FROM test WHERE NOT EXISTS (\ + SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ + AND test.col_uint32 != t2.col_uint32)"; + let plan = test_sql(sql)?; + let expected = "Projection: test.col_utf8\ + \n LeftAnti Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ + \n SubqueryAlias: __correlated_sq_1\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32, col_uint32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + +#[test] +fn where_exists_distinct() -> Result<()> { + let sql = "SELECT col_int32 FROM test WHERE EXISTS (\ + SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)"; + let plan = test_sql(sql)?; + let expected = "LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: __correlated_sq_1\ + \n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + +#[test] +fn propagate_empty_relation() { + let sql = "SELECT test.col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;"; + let plan = test_sql(sql).unwrap(); + // when children exist EmptyRelation, it will bottom-up propagate. + let expected = "EmptyRelation"; + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn join_keys_in_subquery_alias() { + let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;"; + let plan = test_sql(sql).unwrap(); + let expected = "Inner Join: a.col_int32 = b.key\ + \n SubqueryAlias: a\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]"; + + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn join_keys_in_subquery_alias_1() { + let sql = "SELECT * FROM test AS A, ( SELECT test.col_int32 AS key FROM test JOIN test AS C on test.col_int32 = C.col_int32 ) AS B where A.col_int32 = B.key;"; + let plan = test_sql(sql).unwrap(); + let expected = "Inner Join: a.col_int32 = b.key\ + \n SubqueryAlias: a\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ + \n Inner Join: test.col_int32 = c.col_int32\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: c\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +// issue: https://github.com/apache/datafusion/issues/5334 +fn test_same_name_but_not_ambiguous() { + let sql = "SELECT t1.col_int32 AS col_int32 FROM test t1 intersect SELECT col_int32 FROM test t2"; + let plan = test_sql(sql).unwrap(); + let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ + \n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ + \n SubqueryAlias: t1\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn test_propagate_empty_relation_inner_join_and_unions() { + let sql = "\ + SELECT A.col_int32 FROM test AS A \ + INNER JOIN ( \ + SELECT col_int32 FROM test WHERE 1 = 0 \ + ) AS B ON A.col_int32 = B.col_int32 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 1 = 1 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 0 = 0 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 1 = 0"; + + let plan = test_sql(sql).unwrap(); + let expected = "\ + Union\ + \n TableScan: test projection=[col_int32]\ + \n TableScan: test projection=[col_int32]\ + \n Filter: test.col_int32 < Int32(0)\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn semi_join_with_join_filter() -> Result<()> { + // regression test for https://github.com/apache/datafusion/issues/2888 + let sql = "SELECT col_utf8 FROM test WHERE EXISTS (\ + SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ + AND test.col_uint32 != t2.col_uint32)"; + let plan = test_sql(sql)?; + let expected = "Projection: test.col_utf8\ + \n LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ + \n SubqueryAlias: __correlated_sq_1\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32, col_uint32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + fn quick_test(sql: &str, expected_plan: &str) { let plan = test_sql(sql).unwrap(); assert_eq!(expected_plan, format!("{:?}", plan)); @@ -132,7 +266,8 @@ fn test_sql(sql: &str) -> Result { .with_udf(datafusion_functions::core::arrow_cast()) .with_udf(datafusion_functions::string::concat()) .with_udf(datafusion_functions::string::concat_ws()); - let sql_to_rel = SqlToRel::new(&context_provider); + let sql_to_rel = SqlToRel::new(&context_provider) + .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // hard code the return value of now() diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 3b4e4c718313..9f0a65f0a764 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -25,7 +25,6 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, Result}; use datafusion_expr::test::function_stub::sum_udaf; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF}; -use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_optimizer::analyzer::Analyzer; @@ -116,55 +115,6 @@ fn distribute_by() -> Result<()> { Ok(()) } -#[test] -fn semi_join_with_join_filter() -> Result<()> { - // regression test for https://github.com/apache/datafusion/issues/2888 - let sql = "SELECT col_utf8 FROM test WHERE EXISTS (\ - SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ - AND test.col_uint32 != t2.col_uint32)"; - let plan = test_sql(sql)?; - let expected = "Projection: test.col_utf8\ - \n LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ - \n SubqueryAlias: __correlated_sq_1\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32, col_uint32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - -#[test] -fn anti_join_with_join_filter() -> Result<()> { - // regression test for https://github.com/apache/datafusion/issues/2888 - let sql = "SELECT col_utf8 FROM test WHERE NOT EXISTS (\ - SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ - AND test.col_uint32 != t2.col_uint32)"; - let plan = test_sql(sql)?; - let expected = "Projection: test.col_utf8\ - \n LeftAnti Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ - \n SubqueryAlias: __correlated_sq_1\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32, col_uint32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - -#[test] -fn where_exists_distinct() -> Result<()> { - let sql = "SELECT col_int32 FROM test WHERE EXISTS (\ - SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)"; - let plan = test_sql(sql)?; - let expected = "LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: __correlated_sq_1\ - \n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - #[test] fn intersect() -> Result<()> { let sql = "SELECT col_int32, col_utf8 FROM test \ @@ -211,50 +161,6 @@ fn between_date64_plus_interval() -> Result<()> { Ok(()) } -#[test] -fn propagate_empty_relation() { - let sql = "SELECT test.col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;"; - let plan = test_sql(sql).unwrap(); - // when children exist EmptyRelation, it will bottom-up propagate. - let expected = "EmptyRelation"; - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn join_keys_in_subquery_alias() { - let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;"; - let plan = test_sql(sql).unwrap(); - let expected = "Inner Join: a.col_int32 = b.key\ - \n SubqueryAlias: a\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n SubqueryAlias: b\ - \n Projection: test.col_int32 AS key\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]"; - - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn join_keys_in_subquery_alias_1() { - let sql = "SELECT * FROM test AS A, ( SELECT test.col_int32 AS key FROM test JOIN test AS C on test.col_int32 = C.col_int32 ) AS B where A.col_int32 = B.key;"; - let plan = test_sql(sql).unwrap(); - let expected = "Inner Join: a.col_int32 = b.key\ - \n SubqueryAlias: a\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n SubqueryAlias: b\ - \n Projection: test.col_int32 AS key\ - \n Inner Join: test.col_int32 = c.col_int32\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: c\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - #[test] fn push_down_filter_groupby_expr_contains_alias() { let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3"; @@ -266,20 +172,6 @@ fn push_down_filter_groupby_expr_contains_alias() { assert_eq!(expected, format!("{plan:?}")); } -#[test] -// issue: https://github.com/apache/datafusion/issues/5334 -fn test_same_name_but_not_ambiguous() { - let sql = "SELECT t1.col_int32 AS col_int32 FROM test t1 intersect SELECT col_int32 FROM test t2"; - let plan = test_sql(sql).unwrap(); - let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ - \n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ - \n SubqueryAlias: t1\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - #[test] fn eliminate_nested_filters() { let sql = "\ @@ -310,32 +202,6 @@ fn eliminate_redundant_null_check_on_count() { assert_eq!(expected, format!("{plan:?}")); } -#[test] -fn test_propagate_empty_relation_inner_join_and_unions() { - let sql = "\ - SELECT A.col_int32 FROM test AS A \ - INNER JOIN ( \ - SELECT col_int32 FROM test WHERE 1 = 0 \ - ) AS B ON A.col_int32 = B.col_int32 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 1 = 1 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 0 = 0 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 1 = 0"; - - let plan = test_sql(sql).unwrap(); - let expected = "\ - Union\ - \n TableScan: test projection=[col_int32]\ - \n TableScan: test projection=[col_int32]\ - \n Filter: test.col_int32 < Int32(0)\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - fn test_sql(sql: &str) -> Result { // parse the SQL let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ... @@ -345,8 +211,7 @@ fn test_sql(sql: &str) -> Result { .with_udaf(sum_udaf()) .with_udaf(count_udaf()) .with_udaf(avg_udaf()); - let sql_to_rel = SqlToRel::new(&context_provider) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); let config = OptimizerContext::new().with_skip_failing_rules(false); From 73cf5fa553a87af4ffe11e02cb10f537e9ac2f44 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 18 Jul 2024 10:28:55 +0530 Subject: [PATCH 08/18] fix breaking tc's --- datafusion/core/tests/optimizer_integration.rs | 16 +++++++++++++--- datafusion/sql/examples/sql.rs | 8 ++++---- datafusion/sql/src/expr/identifier.rs | 2 +- datafusion/sql/tests/cases/plan_to_sql.rs | 18 +++++++++--------- datafusion/sql/tests/common/mod.rs | 11 +++++++++++ datafusion/sql/tests/sql_integration.rs | 6 +++--- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/optimizer_integration.rs b/datafusion/core/tests/optimizer_integration.rs index 5c6d0c673aa4..fd14bfb82ea8 100644 --- a/datafusion/core/tests/optimizer_integration.rs +++ b/datafusion/core/tests/optimizer_integration.rs @@ -44,6 +44,7 @@ use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::TableReference; use chrono::DateTime; +use datafusion_expr::planner::ExprPlanner; use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions::datetime; @@ -265,9 +266,9 @@ fn test_sql(sql: &str) -> Result { .with_udf(datetime::now()) .with_udf(datafusion_functions::core::arrow_cast()) .with_udf(datafusion_functions::string::concat()) - .with_udf(datafusion_functions::string::concat_ws()); - let sql_to_rel = SqlToRel::new(&context_provider) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + .with_udf(datafusion_functions::string::concat_ws()) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // hard code the return value of now() @@ -286,6 +287,7 @@ fn test_sql(sql: &str) -> Result { struct MyContextProvider { options: ConfigOptions, udfs: HashMap>, + expr_planners: Vec>, } impl MyContextProvider { @@ -293,6 +295,10 @@ impl MyContextProvider { self.udfs.insert(udf.name().to_string(), udf); self } + fn with_expr_planner(mut self, planner: Arc) -> Self { + self.expr_planners.push(planner); + self + } } impl ContextProvider for MyContextProvider { @@ -361,6 +367,10 @@ impl ContextProvider for MyContextProvider { fn udwf_names(&self) -> Vec { Vec::new() } + + fn get_expr_planners(&self) -> &[Arc] { + &self.expr_planners + } } struct MyTableSource { diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 8fbadb4f1e08..6d4b9f008b0c 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. +use std::{collections::HashMap, sync::Arc}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, Result}; use datafusion_expr::WindowUDF; use datafusion_expr::{ logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource, }; -use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_sql::{ @@ -30,7 +32,6 @@ use datafusion_sql::{ sqlparser::{dialect::GenericDialect, parser::Parser}, TableReference, }; -use std::{collections::HashMap, sync::Arc}; fn main() { let sql = "SELECT \ @@ -55,8 +56,7 @@ fn main() { let context_provider = MyContextProvider::new() .with_udaf(sum_udaf()) .with_udaf(count_udaf()); - let sql_to_rel = SqlToRel::new(&context_provider) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // show the plan diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 038e974ad613..450af7c6d424 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -128,7 +128,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { match search_result { // found matching field with spare identifier(s) for nested field(s) in structure Some((field, qualifier, nested_names)) => { - for planner in self.planners.iter() { + for planner in self.context_provider.get_expr_planners() { match planner.plan_compound_identifier( field, qualifier, diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 22043e09e745..66f568224c3b 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -157,9 +157,9 @@ fn roundtrip_statement() -> Result<()> { let context = MockContextProvider::default() .with_udaf(sum_udaf()) - .with_udaf(count_udaf()); - let sql_to_rel = SqlToRel::new(&context) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + .with_udaf(count_udaf()) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context); let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); let roundtrip_statement = plan_to_sql(&plan)?; @@ -187,9 +187,9 @@ fn roundtrip_crossjoin() -> Result<()> { .try_with_sql(query)? .parse_statement()?; - let context = MockContextProvider::default(); - let sql_to_rel = SqlToRel::new(&context) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + let context = MockContextProvider::default() + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context); let plan = sql_to_rel.sql_statement_to_plan(statement).unwrap(); let roundtrip_statement = plan_to_sql(&plan)?; @@ -251,9 +251,9 @@ fn roundtrip_statement_with_dialect() -> Result<()> { .try_with_sql(query.sql)? .parse_statement()?; - let context = MockContextProvider::default(); - let sql_to_rel = SqlToRel::new(&context) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + let context = MockContextProvider::default() + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); + let sql_to_rel = SqlToRel::new(&context); let plan = sql_to_rel .sql_statement_to_plan(statement) .unwrap_or_else(|e| panic!("Failed to parse sql: {}\n{e}", query.sql)); diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index b8d8bd12d28b..71bbd55a15b3 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -25,6 +25,7 @@ use arrow_schema::*; use datafusion_common::config::ConfigOptions; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{plan_err, GetExt, Result, TableReference}; +use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF}; use datafusion_sql::planner::ContextProvider; @@ -53,6 +54,7 @@ pub(crate) struct MockContextProvider { options: ConfigOptions, udfs: HashMap>, udafs: HashMap>, + expr_planners: Vec>, } impl MockContextProvider { @@ -73,6 +75,11 @@ impl MockContextProvider { self.udafs.insert(udaf.name().to_lowercase(), udaf); self } + + pub(crate) fn with_expr_planner(mut self, planner: Arc) -> Self { + self.expr_planners.push(planner); + self + } } impl ContextProvider for MockContextProvider { @@ -240,6 +247,10 @@ impl ContextProvider for MockContextProvider { fn udwf_names(&self) -> Vec { Vec::new() } + + fn get_expr_planners(&self) -> &[Arc] { + return self.expr_planners.as_ref(); + } } struct EmptyTable { diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 8a58c770246e..b4f8b4e6d01c 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2696,10 +2696,10 @@ fn logical_plan_with_dialect_and_options( .with_udaf(approx_median_udaf()) .with_udaf(count_udaf()) .with_udaf(avg_udaf()) - .with_udaf(grouping_udaf()); + .with_udaf(grouping_udaf()) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); - let planner = SqlToRel::new_with_options(&context, options) - .with_user_defined_planner(Arc::new(CoreFunctionPlanner::default())); + let planner = SqlToRel::new_with_options(&context, options); let result = DFParser::parse_sql_with_dialect(sql, dialect); let mut ast = result?; planner.statement_to_plan(ast.pop_front().unwrap()) From e4a7d7c175ad6213d83de1dab87372a55866e57b Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 18 Jul 2024 10:35:19 +0530 Subject: [PATCH 09/18] cleanup --- datafusion/sql/tests/common/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/tests/common/mod.rs b/datafusion/sql/tests/common/mod.rs index 71bbd55a15b3..d9e672a842ce 100644 --- a/datafusion/sql/tests/common/mod.rs +++ b/datafusion/sql/tests/common/mod.rs @@ -249,7 +249,7 @@ impl ContextProvider for MockContextProvider { } fn get_expr_planners(&self) -> &[Arc] { - return self.expr_planners.as_ref(); + &self.expr_planners } } From 7c52b3dbaec9a933d60c258e26cf1578cb4af33b Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 18 Jul 2024 10:54:16 +0530 Subject: [PATCH 10/18] fix examples --- datafusion/sql/examples/sql.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 6d4b9f008b0c..9047e2287f58 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -25,6 +25,8 @@ use datafusion_expr::WindowUDF; use datafusion_expr::{ logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource, }; +use datafusion_expr::planner::ExprPlanner; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; use datafusion_sql::{ @@ -55,7 +57,8 @@ fn main() { // create a logical query plan let context_provider = MyContextProvider::new() .with_udaf(sum_udaf()) - .with_udaf(count_udaf()); + .with_udaf(count_udaf()) + .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); @@ -67,6 +70,7 @@ struct MyContextProvider { options: ConfigOptions, tables: HashMap>, udafs: HashMap>, + expr_planners: Vec>, } impl MyContextProvider { @@ -75,6 +79,11 @@ impl MyContextProvider { self } + fn with_expr_planner(mut self, planner: Arc) -> Self { + self.expr_planners.push(planner); + self + } + fn new() -> Self { let mut tables = HashMap::new(); tables.insert( @@ -107,6 +116,7 @@ impl MyContextProvider { tables, options: Default::default(), udafs: Default::default(), + expr_planners: vec![], } } } @@ -156,4 +166,8 @@ impl ContextProvider for MyContextProvider { fn udwf_names(&self) -> Vec { Vec::new() } + + fn get_expr_planners(&self) -> &[Arc] { + &self.expr_planners + } } From 2ab18e430ccba562893e9deb6ef1b718c10df107 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Thu, 18 Jul 2024 11:26:26 +0530 Subject: [PATCH 11/18] formatting --- datafusion/sql/examples/sql.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 9047e2287f58..d9ee1b4db8e2 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -21,11 +21,11 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, Result}; +use datafusion_expr::planner::ExprPlanner; use datafusion_expr::WindowUDF; use datafusion_expr::{ logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource, }; -use datafusion_expr::planner::ExprPlanner; use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::sum::sum_udaf; From 9378d15b6c8498c732b0dc0a901e0762a978121f Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 19 Jul 2024 11:12:16 +0530 Subject: [PATCH 12/18] rm datafusion-functions from optimizer --- datafusion-cli/Cargo.lock | 1 - datafusion/optimizer/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index fc8d8a98aab9..cdf0e7f57316 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1342,7 +1342,6 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", - "datafusion-functions", "datafusion-physical-expr", "hashbrown 0.14.5", "indexmap 2.2.6", diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 54d8c472f13f..1a9e9630c076 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -45,7 +45,6 @@ async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } -datafusion-functions = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } From 1379cd716effc431232bdb9c1b45fc544f39f521 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 19 Jul 2024 11:14:38 +0530 Subject: [PATCH 13/18] update compound identifier --- datafusion/expr/src/planner.rs | 9 +++---- datafusion/functions/src/core/planner.rs | 34 +++++++++--------------- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index fc9dc4c8d3a6..a388110c8316 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -174,10 +174,7 @@ pub trait ExprPlanner: Send + Sync { Ok(PlannerResult::Original(args)) } - fn plan_get_field(&self, args: Vec) -> Result>> { - Ok(PlannerResult::Original(args)) - } - /// Plans compound identifier eg `db.schema.table`. + /// Plans compound identifier eg `db.schema.table` for non-empty nested names /// /// Note: /// Currently compound identifier for outer query schema is not supported. @@ -185,11 +182,11 @@ pub trait ExprPlanner: Send + Sync { /// Returns empty expression arguments if not possible fn plan_compound_identifier( &self, - _filed: &Field, + _field: &Field, _qualifier: Option<&TableReference>, _nested_names: &[String], ) -> Result>> { - Ok(PlannerResult::Original(vec![])) + not_impl_err!("Default planner compound identifier hasn't been implemented for {self:?} yet") } } diff --git a/datafusion/functions/src/core/planner.rs b/datafusion/functions/src/core/planner.rs index b94da9b53892..889f191d592f 100644 --- a/datafusion/functions/src/core/planner.rs +++ b/datafusion/functions/src/core/planner.rs @@ -70,27 +70,19 @@ impl ExprPlanner for CoreFunctionPlanner { qualifier: Option<&TableReference>, nested_names: &[String], ) -> Result>> { - // found matching field with no spare identifier(s) - if nested_names.is_empty() { - Ok(PlannerResult::Planned(Expr::Column(Column::from(( - qualifier, field, - ))))) - } else { - // found matching field with spare identifier(s) for nested field(s) in structure - // TODO: remove when can support multiple nested identifiers - if nested_names.len() > 1 { - return not_impl_err!( - "Nested identifiers not yet supported for column {}", - Column::from((qualifier, field)).quoted_flat_name() - ); - } - let nested_name = nested_names[0].to_string(); - - let col = Expr::Column(Column::from((qualifier, field))); - let get_field_args = vec![col, lit(ScalarValue::from(nested_name))]; - Ok(PlannerResult::Planned(Expr::ScalarFunction( - ScalarFunction::new_udf(crate::core::get_field(), get_field_args), - ))) + // TODO: remove when can support multiple nested identifiers + if nested_names.len() > 1 { + return not_impl_err!( + "Nested identifiers not yet supported for column {}", + Column::from((qualifier, field)).quoted_flat_name() + ); } + let nested_name = nested_names[0].to_string(); + + let col = Expr::Column(Column::from((qualifier, field))); + let get_field_args = vec![col, lit(ScalarValue::from(nested_name))]; + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf(crate::core::get_field(), get_field_args), + ))) } } From d6c359011cc07538b46d541f95cf2043f9bc8c1a Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 19 Jul 2024 11:34:29 +0530 Subject: [PATCH 14/18] update planner --- datafusion/expr/src/planner.rs | 2 +- datafusion/sql/src/expr/identifier.rs | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index a388110c8316..ebbc1a25bae5 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -179,7 +179,7 @@ pub trait ExprPlanner: Send + Sync { /// Note: /// Currently compound identifier for outer query schema is not supported. /// - /// Returns empty expression arguments if not possible + /// Returns planned expression fn plan_compound_identifier( &self, _field: &Field, diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 450af7c6d424..f8979bde3086 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -127,21 +127,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let search_result = search_dfschema(&ids, schema); match search_result { // found matching field with spare identifier(s) for nested field(s) in structure - Some((field, qualifier, nested_names)) => { + Some((field, qualifier, nested_names)) if !nested_names.is_empty() => { + // found matching field with spare identifier(s) for nested field(s) in structure for planner in self.context_provider.get_expr_planners() { - match planner.plan_compound_identifier( + if let Ok(planner_result) = planner.plan_compound_identifier( field, qualifier, nested_names, - )? { - PlannerResult::Planned(expr) => return Ok(expr), - PlannerResult::Original(_args) => {} + ) { + match planner_result { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(_args) => {} + } } } not_impl_err!( "Compound identifiers not supported by ExprPlanner: {ids:?}" ) } + // found matching field with no spare identifier(s) + Some((field, qualifier, _nested_names)) => { + Ok(Expr::Column(Column::from((qualifier, field)))) + } None => { // return default where use all identifiers to not have a nested field // this len check is because at 5 identifiers will have to have a nested field From d2fa19ae511ace3633c6571672b0558914489488 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 19 Jul 2024 11:35:27 +0530 Subject: [PATCH 15/18] update planner --- datafusion/expr/src/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index ebbc1a25bae5..cebec571ea38 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -186,7 +186,7 @@ pub trait ExprPlanner: Send + Sync { _qualifier: Option<&TableReference>, _nested_names: &[String], ) -> Result>> { - not_impl_err!("Default planner compound identifier hasn't been implemented for {self:?} yet") + not_impl_err!("Default planner compound identifier hasn't been implemented for ExprPlanner") } } From 3076d548d70aa14422cb8b696aca2c93a4ab97c0 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Fri, 19 Jul 2024 11:40:49 +0530 Subject: [PATCH 16/18] formatting --- datafusion/expr/src/planner.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index cebec571ea38..b976eabb1a6c 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -186,7 +186,9 @@ pub trait ExprPlanner: Send + Sync { _qualifier: Option<&TableReference>, _nested_names: &[String], ) -> Result>> { - not_impl_err!("Default planner compound identifier hasn't been implemented for ExprPlanner") + not_impl_err!( + "Default planner compound identifier hasn't been implemented for ExprPlanner" + ) } } From 40ab63c7efebef737119a8f65bdbcbd2998e1fb7 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sat, 20 Jul 2024 17:10:03 +0530 Subject: [PATCH 17/18] reverting optimizer tests --- .../core/tests/optimizer_integration.rs | 157 +----------------- .../optimizer/tests/optimizer_integration.rs | 135 ++++++++++++++- 2 files changed, 140 insertions(+), 152 deletions(-) diff --git a/datafusion/core/tests/optimizer_integration.rs b/datafusion/core/tests/optimizer_integration.rs index fd14bfb82ea8..3214c40bf220 100644 --- a/datafusion/core/tests/optimizer_integration.rs +++ b/datafusion/core/tests/optimizer_integration.rs @@ -44,8 +44,6 @@ use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::TableReference; use chrono::DateTime; -use datafusion_expr::planner::ExprPlanner; -use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_functions::datetime; #[cfg(test)] @@ -117,139 +115,6 @@ fn concat_ws_literals() -> Result<()> { Ok(()) } -#[test] -fn anti_join_with_join_filter() -> Result<()> { - // regression test for https://github.com/apache/datafusion/issues/2888 - let sql = "SELECT col_utf8 FROM test WHERE NOT EXISTS (\ - SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ - AND test.col_uint32 != t2.col_uint32)"; - let plan = test_sql(sql)?; - let expected = "Projection: test.col_utf8\ - \n LeftAnti Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ - \n SubqueryAlias: __correlated_sq_1\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32, col_uint32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - -#[test] -fn where_exists_distinct() -> Result<()> { - let sql = "SELECT col_int32 FROM test WHERE EXISTS (\ - SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)"; - let plan = test_sql(sql)?; - let expected = "LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: __correlated_sq_1\ - \n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - -#[test] -fn propagate_empty_relation() { - let sql = "SELECT test.col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;"; - let plan = test_sql(sql).unwrap(); - // when children exist EmptyRelation, it will bottom-up propagate. - let expected = "EmptyRelation"; - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn join_keys_in_subquery_alias() { - let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;"; - let plan = test_sql(sql).unwrap(); - let expected = "Inner Join: a.col_int32 = b.key\ - \n SubqueryAlias: a\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n SubqueryAlias: b\ - \n Projection: test.col_int32 AS key\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]"; - - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn join_keys_in_subquery_alias_1() { - let sql = "SELECT * FROM test AS A, ( SELECT test.col_int32 AS key FROM test JOIN test AS C on test.col_int32 = C.col_int32 ) AS B where A.col_int32 = B.key;"; - let plan = test_sql(sql).unwrap(); - let expected = "Inner Join: a.col_int32 = b.key\ - \n SubqueryAlias: a\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ - \n SubqueryAlias: b\ - \n Projection: test.col_int32 AS key\ - \n Inner Join: test.col_int32 = c.col_int32\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: c\ - \n Filter: test.col_int32 IS NOT NULL\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -// issue: https://github.com/apache/datafusion/issues/5334 -fn test_same_name_but_not_ambiguous() { - let sql = "SELECT t1.col_int32 AS col_int32 FROM test t1 intersect SELECT col_int32 FROM test t2"; - let plan = test_sql(sql).unwrap(); - let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ - \n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ - \n SubqueryAlias: t1\ - \n TableScan: test projection=[col_int32]\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn test_propagate_empty_relation_inner_join_and_unions() { - let sql = "\ - SELECT A.col_int32 FROM test AS A \ - INNER JOIN ( \ - SELECT col_int32 FROM test WHERE 1 = 0 \ - ) AS B ON A.col_int32 = B.col_int32 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 1 = 1 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 0 = 0 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \ - UNION ALL \ - SELECT test.col_int32 FROM test WHERE 1 = 0"; - - let plan = test_sql(sql).unwrap(); - let expected = "\ - Union\ - \n TableScan: test projection=[col_int32]\ - \n TableScan: test projection=[col_int32]\ - \n Filter: test.col_int32 < Int32(0)\ - \n TableScan: test projection=[col_int32]"; - assert_eq!(expected, format!("{plan:?}")); -} - -#[test] -fn semi_join_with_join_filter() -> Result<()> { - // regression test for https://github.com/apache/datafusion/issues/2888 - let sql = "SELECT col_utf8 FROM test WHERE EXISTS (\ - SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ - AND test.col_uint32 != t2.col_uint32)"; - let plan = test_sql(sql)?; - let expected = "Projection: test.col_utf8\ - \n LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ - \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ - \n SubqueryAlias: __correlated_sq_1\ - \n SubqueryAlias: t2\ - \n TableScan: test projection=[col_int32, col_uint32]"; - assert_eq!(expected, format!("{plan:?}")); - Ok(()) -} - fn quick_test(sql: &str, expected_plan: &str) { let plan = test_sql(sql).unwrap(); assert_eq!(expected_plan, format!("{:?}", plan)); @@ -266,8 +131,7 @@ fn test_sql(sql: &str) -> Result { .with_udf(datetime::now()) .with_udf(datafusion_functions::core::arrow_cast()) .with_udf(datafusion_functions::string::concat()) - .with_udf(datafusion_functions::string::concat_ws()) - .with_expr_planner(Arc::new(CoreFunctionPlanner::default())); + .with_udf(datafusion_functions::string::concat_ws()); let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); @@ -287,7 +151,6 @@ fn test_sql(sql: &str) -> Result { struct MyContextProvider { options: ConfigOptions, udfs: HashMap>, - expr_planners: Vec>, } impl MyContextProvider { @@ -295,10 +158,6 @@ impl MyContextProvider { self.udfs.insert(udf.name().to_string(), udf); self } - fn with_expr_planner(mut self, planner: Arc) -> Self { - self.expr_planners.push(planner); - self - } } impl ContextProvider for MyContextProvider { @@ -367,10 +226,6 @@ impl ContextProvider for MyContextProvider { fn udwf_names(&self) -> Vec { Vec::new() } - - fn get_expr_planners(&self) -> &[Arc] { - &self.expr_planners - } } struct MyTableSource { @@ -406,7 +261,7 @@ fn test_nested_schema_nullability() { vec![Some("table_name".into()), None], &Arc::new(schema), ) - .unwrap(); + .unwrap(); let expr = col("parent").field("child"); assert!(expr.nullable(&dfschema).unwrap()); @@ -479,9 +334,9 @@ fn test_inequalities_non_null_bounded() { } fn validate_simplified_cases(rewriter: &mut GuaranteeRewriter, cases: &[(Expr, T)]) -where - ScalarValue: From, - T: Clone, + where + ScalarValue: From, + T: Clone, { for (expr, expected_value) in cases { let output = expr.clone().rewrite(rewriter).data().unwrap(); @@ -502,4 +357,4 @@ fn validate_unchanged_cases(rewriter: &mut GuaranteeRewriter, cases: &[Expr]) { expr, output ); } -} +} \ No newline at end of file diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 9f0a65f0a764..150a2f2d6eb3 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -115,6 +115,55 @@ fn distribute_by() -> Result<()> { Ok(()) } +#[test] +fn semi_join_with_join_filter() -> Result<()> { + // regression test for https://github.com/apache/datafusion/issues/2888 + let sql = "SELECT col_utf8 FROM test WHERE EXISTS (\ + SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ + AND test.col_uint32 != t2.col_uint32)"; + let plan = test_sql(sql)?; + let expected = "Projection: test.col_utf8\ + \n LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ + \n SubqueryAlias: __correlated_sq_1\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32, col_uint32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + +#[test] +fn anti_join_with_join_filter() -> Result<()> { + // regression test for https://github.com/apache/datafusion/issues/2888 + let sql = "SELECT col_utf8 FROM test WHERE NOT EXISTS (\ + SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \ + AND test.col_uint32 != t2.col_uint32)"; + let plan = test_sql(sql)?; + let expected = "Projection: test.col_utf8\ + \n LeftAnti Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8]\ + \n SubqueryAlias: __correlated_sq_1\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32, col_uint32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + +#[test] +fn where_exists_distinct() -> Result<()> { + let sql = "SELECT col_int32 FROM test WHERE EXISTS (\ + SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)"; + let plan = test_sql(sql)?; + let expected = "LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: __correlated_sq_1\ + \n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); + Ok(()) +} + #[test] fn intersect() -> Result<()> { let sql = "SELECT col_int32, col_utf8 FROM test \ @@ -161,6 +210,50 @@ fn between_date64_plus_interval() -> Result<()> { Ok(()) } +#[test] +fn propagate_empty_relation() { + let sql = "SELECT test.col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;"; + let plan = test_sql(sql).unwrap(); + // when children exist EmptyRelation, it will bottom-up propagate. + let expected = "EmptyRelation"; + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn join_keys_in_subquery_alias() { + let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;"; + let plan = test_sql(sql).unwrap(); + let expected = "Inner Join: a.col_int32 = b.key\ + \n SubqueryAlias: a\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]"; + + assert_eq!(expected, format!("{plan:?}")); +} + +#[test] +fn join_keys_in_subquery_alias_1() { + let sql = "SELECT * FROM test AS A, ( SELECT test.col_int32 AS key FROM test JOIN test AS C on test.col_int32 = C.col_int32 ) AS B where A.col_int32 = B.key;"; + let plan = test_sql(sql).unwrap(); + let expected = "Inner Join: a.col_int32 = b.key\ + \n SubqueryAlias: a\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\ + \n SubqueryAlias: b\ + \n Projection: test.col_int32 AS key\ + \n Inner Join: test.col_int32 = c.col_int32\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: c\ + \n Filter: test.col_int32 IS NOT NULL\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + #[test] fn push_down_filter_groupby_expr_contains_alias() { let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3"; @@ -172,6 +265,20 @@ fn push_down_filter_groupby_expr_contains_alias() { assert_eq!(expected, format!("{plan:?}")); } +#[test] +// issue: https://github.com/apache/datafusion/issues/5334 +fn test_same_name_but_not_ambiguous() { + let sql = "SELECT t1.col_int32 AS col_int32 FROM test t1 intersect SELECT col_int32 FROM test t2"; + let plan = test_sql(sql).unwrap(); + let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ + \n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ + \n SubqueryAlias: t1\ + \n TableScan: test projection=[col_int32]\ + \n SubqueryAlias: t2\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + #[test] fn eliminate_nested_filters() { let sql = "\ @@ -202,6 +309,32 @@ fn eliminate_redundant_null_check_on_count() { assert_eq!(expected, format!("{plan:?}")); } +#[test] +fn test_propagate_empty_relation_inner_join_and_unions() { + let sql = "\ + SELECT A.col_int32 FROM test AS A \ + INNER JOIN ( \ + SELECT col_int32 FROM test WHERE 1 = 0 \ + ) AS B ON A.col_int32 = B.col_int32 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 1 = 1 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 0 = 0 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \ + UNION ALL \ + SELECT test.col_int32 FROM test WHERE 1 = 0"; + + let plan = test_sql(sql).unwrap(); + let expected = "\ + Union\ + \n TableScan: test projection=[col_int32]\ + \n TableScan: test projection=[col_int32]\ + \n Filter: test.col_int32 < Int32(0)\ + \n TableScan: test projection=[col_int32]"; + assert_eq!(expected, format!("{plan:?}")); +} + fn test_sql(sql: &str) -> Result { // parse the SQL let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ... @@ -318,4 +451,4 @@ impl TableSource for MyTableSource { fn schema(&self) -> SchemaRef { self.schema.clone() } -} +} \ No newline at end of file From 671ee932cc84b82f5ebe6c6ff5ee27be19e1b187 Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sat, 20 Jul 2024 17:11:09 +0530 Subject: [PATCH 18/18] formatting --- datafusion/core/tests/optimizer_integration.rs | 10 +++++----- datafusion/optimizer/tests/optimizer_integration.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/optimizer_integration.rs b/datafusion/core/tests/optimizer_integration.rs index 3214c40bf220..39f745cd3309 100644 --- a/datafusion/core/tests/optimizer_integration.rs +++ b/datafusion/core/tests/optimizer_integration.rs @@ -261,7 +261,7 @@ fn test_nested_schema_nullability() { vec![Some("table_name".into()), None], &Arc::new(schema), ) - .unwrap(); + .unwrap(); let expr = col("parent").field("child"); assert!(expr.nullable(&dfschema).unwrap()); @@ -334,9 +334,9 @@ fn test_inequalities_non_null_bounded() { } fn validate_simplified_cases(rewriter: &mut GuaranteeRewriter, cases: &[(Expr, T)]) - where - ScalarValue: From, - T: Clone, +where + ScalarValue: From, + T: Clone, { for (expr, expected_value) in cases { let output = expr.clone().rewrite(rewriter).data().unwrap(); @@ -357,4 +357,4 @@ fn validate_unchanged_cases(rewriter: &mut GuaranteeRewriter, cases: &[Expr]) { expr, output ); } -} \ No newline at end of file +} diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 150a2f2d6eb3..c0863839dba1 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -451,4 +451,4 @@ impl TableSource for MyTableSource { fn schema(&self) -> SchemaRef { self.schema.clone() } -} \ No newline at end of file +}