Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sql_compound_identifier_to_expr to ExprPlanner #11487

Merged
merged 20 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 146 additions & 1 deletion datafusion/core/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_expr::planner::ExprPlanner;
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_functions::datetime;

#[cfg(test)]
Expand Down Expand Up @@ -115,6 +117,139 @@ fn concat_ws_literals() -> Result<()> {
Ok(())
}

#[test]
fn anti_join_with_join_filter() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these tests moved? Does that mean we can't plan SQL with identifiers like test.col_int32 without an Expr planner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the earlier implementation, I also moved plannig of basic SQL like column references to expr planner, we had to move these test cases from optimizer crate to core to avoid optimizer crate dependecy on functions.

Since we have moved that out of planner, ideally i should moved the testcases back. My bad, i missed them. Thanks for catching this. I have fixed this.

// regression test for https://github.com/apache/datafusion/issues/2888
let sql = "SELECT col_utf8 FROM test WHERE NOT EXISTS (\
SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \
AND test.col_uint32 != t2.col_uint32)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_utf8\
\n LeftAnti Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\
\n TableScan: test projection=[col_int32, col_uint32, col_utf8]\
\n SubqueryAlias: __correlated_sq_1\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32, col_uint32]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
}

#[test]
fn where_exists_distinct() -> Result<()> {
let sql = "SELECT col_int32 FROM test WHERE EXISTS (\
SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = t2.col_int32)";
let plan = test_sql(sql)?;
let expected = "LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: __correlated_sq_1\
\n Aggregate: groupBy=[[t2.col_int32]], aggr=[[]]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
}

#[test]
fn propagate_empty_relation() {
let sql = "SELECT test.col_int32 FROM test JOIN ( SELECT col_int32 FROM test WHERE false ) AS ta1 ON test.col_int32 = ta1.col_int32;";
let plan = test_sql(sql).unwrap();
// when children exist EmptyRelation, it will bottom-up propagate.
let expected = "EmptyRelation";
assert_eq!(expected, format!("{plan:?}"));
}

#[test]
fn join_keys_in_subquery_alias() {
let sql = "SELECT * FROM test AS A, ( SELECT col_int32 as key FROM test ) AS B where A.col_int32 = B.key;";
let plan = test_sql(sql).unwrap();
let expected = "Inner Join: a.col_int32 = b.key\
\n SubqueryAlias: a\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\
\n SubqueryAlias: b\
\n Projection: test.col_int32 AS key\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32]";

assert_eq!(expected, format!("{plan:?}"));
}

#[test]
fn join_keys_in_subquery_alias_1() {
let sql = "SELECT * FROM test AS A, ( SELECT test.col_int32 AS key FROM test JOIN test AS C on test.col_int32 = C.col_int32 ) AS B where A.col_int32 = B.key;";
let plan = test_sql(sql).unwrap();
let expected = "Inner Join: a.col_int32 = b.key\
\n SubqueryAlias: a\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_uint32, col_utf8, col_date32, col_date64, col_ts_nano_none, col_ts_nano_utc]\
\n SubqueryAlias: b\
\n Projection: test.col_int32 AS key\
\n Inner Join: test.col_int32 = c.col_int32\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: c\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan:?}"));
}

#[test]
// issue: https://github.com/apache/datafusion/issues/5334
fn test_same_name_but_not_ambiguous() {
let sql = "SELECT t1.col_int32 AS col_int32 FROM test t1 intersect SELECT col_int32 FROM test t2";
let plan = test_sql(sql).unwrap();
let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\
\n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\
\n SubqueryAlias: t1\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan:?}"));
}

#[test]
fn test_propagate_empty_relation_inner_join_and_unions() {
let sql = "\
SELECT A.col_int32 FROM test AS A \
INNER JOIN ( \
SELECT col_int32 FROM test WHERE 1 = 0 \
) AS B ON A.col_int32 = B.col_int32 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 1 = 1 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 0 = 0 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \
UNION ALL \
SELECT test.col_int32 FROM test WHERE 1 = 0";

let plan = test_sql(sql).unwrap();
let expected = "\
Union\
\n TableScan: test projection=[col_int32]\
\n TableScan: test projection=[col_int32]\
\n Filter: test.col_int32 < Int32(0)\
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan:?}"));
}

#[test]
fn semi_join_with_join_filter() -> Result<()> {
// regression test for https://github.com/apache/datafusion/issues/2888
let sql = "SELECT col_utf8 FROM test WHERE EXISTS (\
SELECT col_utf8 FROM test t2 WHERE test.col_int32 = t2.col_int32 \
AND test.col_uint32 != t2.col_uint32)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_utf8\
\n LeftSemi Join: test.col_int32 = __correlated_sq_1.col_int32 Filter: test.col_uint32 != __correlated_sq_1.col_uint32\
\n TableScan: test projection=[col_int32, col_uint32, col_utf8]\
\n SubqueryAlias: __correlated_sq_1\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32, col_uint32]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
}

fn quick_test(sql: &str, expected_plan: &str) {
let plan = test_sql(sql).unwrap();
assert_eq!(expected_plan, format!("{:?}", plan));
Expand All @@ -131,7 +266,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
.with_udf(datetime::now())
.with_udf(datafusion_functions::core::arrow_cast())
.with_udf(datafusion_functions::string::concat())
.with_udf(datafusion_functions::string::concat_ws());
.with_udf(datafusion_functions::string::concat_ws())
.with_expr_planner(Arc::new(CoreFunctionPlanner::default()));
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

Expand All @@ -151,13 +287,18 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
struct MyContextProvider {
options: ConfigOptions,
udfs: HashMap<String, Arc<ScalarUDF>>,
expr_planners: Vec<Arc<dyn ExprPlanner>>,
}

impl MyContextProvider {
fn with_udf(mut self, udf: Arc<ScalarUDF>) -> Self {
self.udfs.insert(udf.name().to_string(), udf);
self
}
fn with_expr_planner(mut self, planner: Arc<dyn ExprPlanner>) -> Self {
self.expr_planners.push(planner);
self
}
}

impl ContextProvider for MyContextProvider {
Expand Down Expand Up @@ -226,6 +367,10 @@ impl ContextProvider for MyContextProvider {
fn udwf_names(&self) -> Vec<String> {
Vec::new()
}

fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
&self.expr_planners
}
}

struct MyTableSource {
Expand Down
20 changes: 19 additions & 1 deletion datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion_common::{
config::ConfigOptions, file_options::file_type::FileType, not_impl_err, DFSchema,
Result, TableReference,
Expand Down Expand Up @@ -173,6 +173,24 @@ pub trait ExprPlanner: Send + Sync {
fn plan_overlay(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

fn plan_get_field(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
dharanad marked this conversation as resolved.
Show resolved Hide resolved
Ok(PlannerResult::Original(args))
}
/// Plans compound identifier eg `db.schema.table`.
///
/// Note:
/// Currently compound identifier for outer query schema is not supported.
///
/// Returns empty expression arguments if not possible
dharanad marked this conversation as resolved.
Show resolved Hide resolved
fn plan_compound_identifier(
&self,
_filed: &Field,
dharanad marked this conversation as resolved.
Show resolved Hide resolved
_qualifier: Option<&TableReference>,
_nested_names: &[String],
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(vec![]))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
nvl2(),
arrow_typeof(),
named_struct(),
get_field(),
coalesce(),
make_map(),
map(),
Expand Down
35 changes: 33 additions & 2 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use datafusion_common::DFSchema;
use arrow::datatypes::Field;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Column, DFSchema, ScalarValue, TableReference};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{ExprPlanner, PlannerResult, RawDictionaryExpr};
use datafusion_expr::Expr;
use datafusion_expr::{lit, Expr};

use super::named_struct;

Expand Down Expand Up @@ -62,4 +63,34 @@ impl ExprPlanner for CoreFunctionPlanner {
ScalarFunction::new_udf(crate::string::overlay(), args),
)))
}

fn plan_compound_identifier(
&self,
field: &Field,
qualifier: Option<&TableReference>,
nested_names: &[String],
) -> Result<PlannerResult<Vec<Expr>>> {
// found matching field with no spare identifier(s)
if nested_names.is_empty() {
Ok(PlannerResult::Planned(Expr::Column(Column::from((
qualifier, field,
)))))
} else {
// found matching field with spare identifier(s) for nested field(s) in structure
// TODO: remove when can support multiple nested identifiers
if nested_names.len() > 1 {
return not_impl_err!(
"Nested identifiers not yet supported for column {}",
Column::from((qualifier, field)).quoted_flat_name()
);
}
let nested_name = nested_names[0].to_string();

let col = Expr::Column(Column::from((qualifier, field)));
let get_field_args = vec![col, lit(ScalarValue::from(nested_name))];
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(crate::core::get_field(), get_field_args),
)))
}
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-physical-expr = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
Expand Down
Loading