From e1dc9627c648bd3ab110ebc383713a022bb37256 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 4 Jan 2023 12:00:59 +0000 Subject: [PATCH] Move ConfigOptions to core (#4803) * Move ConfigOptions to core * Fix defaults * Toml format --- datafusion-cli/Cargo.lock | 1 + datafusion-examples/examples/rewrite_expr.rs | 15 +++--- datafusion/common/Cargo.toml | 1 + datafusion/{core => common}/src/config.rs | 2 +- datafusion/common/src/lib.rs | 1 + datafusion/core/src/execution/context.rs | 32 ++---------- datafusion/core/src/lib.rs | 2 +- .../tests/sqllogictests/src/insert/util.rs | 5 +- .../optimizer/src/filter_null_join_keys.rs | 6 ++- datafusion/optimizer/src/optimizer.rs | 52 ++++++------------- .../optimizer/tests/integration-test.rs | 15 +++--- datafusion/sql/examples/sql.rs | 14 ++--- datafusion/sql/src/planner.rs | 28 +++------- 13 files changed, 65 insertions(+), 109 deletions(-) rename datafusion/{core => common}/src/config.rs (99%) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index b78d5af9c937..5c0c00b7a343 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -717,6 +717,7 @@ version = "15.0.0" dependencies = [ "arrow", "chrono", + "num_cpus", "object_store", "parquet", "sqlparser", diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 7dbaca2862ac..b63f8b1763d6 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -16,6 +16,7 @@ // under the License. use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion_expr::{ @@ -37,7 +38,7 @@ pub fn main() -> Result<()> { let statements = Parser::parse_sql(&dialect, sql)?; // produce a logical plan using the datafusion-sql crate - let context_provider = MyContextProvider {}; + let context_provider = MyContextProvider::default(); let sql_to_rel = SqlToRel::new(&context_provider); let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?; println!( @@ -120,7 +121,10 @@ impl ExprRewriter for MyExprRewriter { } } -struct MyContextProvider {} +#[derive(Default)] +struct MyContextProvider { + options: ConfigOptions, +} impl ContextProvider for MyContextProvider { fn get_table_provider(&self, name: TableReference) -> Result> { @@ -148,11 +152,8 @@ impl ContextProvider for MyContextProvider { None } - fn get_config_option( - &self, - _variable: &str, - ) -> Option { - None + fn options(&self) -> &ConfigOptions { + &self.options } } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 9cf0126b50ec..cc93efa94d0f 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -43,6 +43,7 @@ apache-avro = { version = "0.14", default-features = false, features = ["snappy" arrow = { version = "29.0.0", default-features = false } chrono = { version = "0.4", default-features = false } cranelift-module = { version = "0.89.0", optional = true } +num_cpus = "1.13.0" object_store = { version = "0.5.0", default-features = false, optional = true } parquet = { version = "29.0.0", default-features = false, optional = true } pyo3 = { version = "0.17.1", optional = true } diff --git a/datafusion/core/src/config.rs b/datafusion/common/src/config.rs similarity index 99% rename from datafusion/core/src/config.rs rename to datafusion/common/src/config.rs index 4b3df66da671..51421c6b2506 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/common/src/config.rs @@ -17,7 +17,7 @@ //! DataFusion Configuration Options -use datafusion_common::{DataFusionError, Result}; +use crate::{DataFusionError, Result}; use std::any::Any; use std::collections::BTreeMap; use std::fmt::Display; diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 392fa3f25a67..2935cd79639d 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -18,6 +18,7 @@ pub mod bisect; pub mod cast; mod column; +pub mod config; pub mod delta; mod dfschema; mod error; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 70882d8fa47d..45131493e216 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1757,18 +1757,8 @@ impl ContextProvider for SessionState { .and_then(|provider| provider.get(&provider_type)?.get_type(variable_names)) } - fn get_config_option(&self, variable: &str) -> Option { - // TOOD: Move ConfigOptions into common crate - match variable { - "datafusion.execution.time_zone" => self - .config - .options - .execution - .time_zone - .as_ref() - .map(|s| ScalarValue::Utf8(Some(s.clone()))), - _ => unimplemented!(), - } + fn options(&self) -> &ConfigOptions { + self.config_options() } } @@ -1803,22 +1793,8 @@ impl OptimizerConfig for SessionState { self.execution_props.query_execution_start_time } - fn rule_enabled(&self, name: &str) -> bool { - use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys; - match name { - FilterNullJoinKeys::NAME => { - self.config_options().optimizer.filter_null_join_keys - } - _ => true, - } - } - - fn skip_failing_rules(&self) -> bool { - self.config_options().optimizer.skip_failed_rules - } - - fn max_passes(&self) -> u8 { - self.config_options().optimizer.max_passes as _ + fn options(&self) -> &ConfigOptions { + self.config_options() } } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index abfa1ba38109..a1cbd653e69b 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -215,7 +215,6 @@ extern crate sqlparser; pub mod avro_to_arrow; pub mod catalog; -pub mod config; pub mod dataframe; pub mod datasource; pub mod error; @@ -234,6 +233,7 @@ pub use parquet; // re-export DataFusion crates pub use datafusion_common as common; +pub use datafusion_common::config; pub use datafusion_expr as logical_expr; pub use datafusion_optimizer as optimizer; pub use datafusion_physical_expr as physical_expr; diff --git a/datafusion/core/tests/sqllogictests/src/insert/util.rs b/datafusion/core/tests/sqllogictests/src/insert/util.rs index 03dbb72995ff..41dfbad39424 100644 --- a/datafusion/core/tests/sqllogictests/src/insert/util.rs +++ b/datafusion/core/tests/sqllogictests/src/insert/util.rs @@ -16,7 +16,8 @@ // under the License. use arrow::datatypes::DataType; -use datafusion_common::{ScalarValue, TableReference}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::TableReference; use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource}; use datafusion_sql::planner::ContextProvider; use std::sync::Arc; @@ -44,7 +45,7 @@ impl ContextProvider for LogicTestContextProvider { todo!() } - fn get_config_option(&self, _variable: &str) -> Option { + fn options(&self) -> &ConfigOptions { todo!() } } diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 8f221eaccd35..95cd8a9fd36c 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -43,8 +43,12 @@ impl OptimizerRule for FilterNullJoinKeys { fn try_optimize( &self, plan: &LogicalPlan, - _config: &dyn OptimizerConfig, + config: &dyn OptimizerConfig, ) -> Result> { + if !config.options().optimizer.filter_null_join_keys { + return Ok(None); + } + match plan { LogicalPlan::Join(join) if join.join_type == JoinType::Inner => { let mut join = join.clone(); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 762330a6d278..3e7df55d5b1b 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -38,6 +38,7 @@ use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; use crate::type_coercion::TypeCoercion; use crate::unwrap_cast_in_comparison::UnwrapCastInComparison; use chrono::{DateTime, Utc}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::logical_plan::LogicalPlan; use log::{debug, trace, warn}; @@ -74,14 +75,7 @@ pub trait OptimizerConfig { /// time is used as the value for now() fn query_execution_start_time(&self) -> DateTime; - /// Returns false if the given rule should be skipped - fn rule_enabled(&self, name: &str) -> bool; - - /// The optimizer will skip failing rules if this returns true - fn skip_failing_rules(&self) -> bool; - - /// How many times to attempt to optimize the plan - fn max_passes(&self) -> u8; + fn options(&self) -> &ConfigOptions; } /// A standalone [`OptimizerConfig`] that can be used independently @@ -91,28 +85,25 @@ pub struct OptimizerContext { /// Query execution start time that can be used to rewrite /// expressions such as `now()` to use a literal value instead query_execution_start_time: DateTime, - /// Option to skip rules that produce errors - skip_failing_rules: bool, - /// Specify whether to enable the filter_null_keys rule - filter_null_keys: bool, - /// Maximum number of times to run optimizer against a plan - max_passes: u8, + + options: ConfigOptions, } impl OptimizerContext { /// Create optimizer config pub fn new() -> Self { + let mut options = ConfigOptions::default(); + options.optimizer.filter_null_join_keys = true; + Self { query_execution_start_time: Utc::now(), - skip_failing_rules: true, - filter_null_keys: true, - max_passes: 3, + options, } } /// Specify whether to enable the filter_null_keys rule pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self { - self.filter_null_keys = filter_null_keys; + self.options.optimizer.filter_null_join_keys = filter_null_keys; self } @@ -129,13 +120,13 @@ impl OptimizerContext { /// Specify whether the optimizer should skip rules that produce /// errors, or fail the query pub fn with_skip_failing_rules(mut self, b: bool) -> Self { - self.skip_failing_rules = b; + self.options.optimizer.skip_failed_rules = b; self } /// Specify how many times to attempt to optimize the plan pub fn with_max_passes(mut self, v: u8) -> Self { - self.max_passes = v; + self.options.optimizer.max_passes = v as usize; self } } @@ -152,16 +143,8 @@ impl OptimizerConfig for OptimizerContext { self.query_execution_start_time } - fn rule_enabled(&self, name: &str) -> bool { - self.filter_null_keys || name != FilterNullJoinKeys::NAME - } - - fn skip_failing_rules(&self) -> bool { - self.skip_failing_rules - } - - fn max_passes(&self) -> u8 { - self.max_passes + fn options(&self) -> &ConfigOptions { + &self.options } } @@ -271,18 +254,15 @@ impl Optimizer { where F: FnMut(&LogicalPlan, &dyn OptimizerRule), { + let options = config.options(); let start_time = Instant::now(); let mut plan_str = format!("{}", plan.display_indent()); let mut new_plan = plan.clone(); let mut i = 0; - while i < config.max_passes() { + while i < options.optimizer.max_passes { log_plan(&format!("Optimizer input (pass {i})"), &new_plan); for rule in &self.rules { - if !config.rule_enabled(rule.name()) { - debug!("Skipping rule {} due to optimizer config", rule.name()); - continue; - } let result = self.optimize_recursively(rule, &new_plan, config); match result { @@ -308,7 +288,7 @@ impl Optimizer { ); } Err(ref e) => { - if config.skip_failing_rules() { + if options.optimizer.skip_failed_rules { // Note to future readers: if you see this warning it signals a // bug in the DataFusion optimizer. Please consider filing a ticket // https://github.com/apache/arrow-datafusion diff --git a/datafusion/optimizer/tests/integration-test.rs b/datafusion/optimizer/tests/integration-test.rs index 7d5c121813f0..f901e33d41d6 100644 --- a/datafusion/optimizer/tests/integration-test.rs +++ b/datafusion/optimizer/tests/integration-test.rs @@ -17,6 +17,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use chrono::{DateTime, NaiveDateTime, Utc}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource}; use datafusion_optimizer::optimizer::Optimizer; @@ -323,7 +324,7 @@ fn test_sql(sql: &str) -> Result { let statement = &ast[0]; // create a logical query plan - let schema_provider = MySchemaProvider {}; + let schema_provider = MySchemaProvider::default(); let sql_to_rel = SqlToRel::new(&schema_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); @@ -338,7 +339,10 @@ fn test_sql(sql: &str) -> Result { optimizer.optimize(&plan, &config, &observe) } -struct MySchemaProvider {} +#[derive(Default)] +struct MySchemaProvider { + options: ConfigOptions, +} impl ContextProvider for MySchemaProvider { fn get_table_provider( @@ -390,11 +394,8 @@ impl ContextProvider for MySchemaProvider { None } - fn get_config_option( - &self, - _variable: &str, - ) -> Option { - None + fn options(&self) -> &ConfigOptions { + &self.options } } diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 8d6135aabea4..b0a0a73e7ce7 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -16,6 +16,7 @@ // under the License. use arrow_schema::{DataType, Field, Schema}; +use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource, @@ -56,6 +57,7 @@ fn main() { } struct MySchemaProvider { + options: ConfigOptions, tables: HashMap>, } @@ -88,7 +90,10 @@ impl MySchemaProvider { Field::new("price", DataType::Decimal128(10, 2), false), ]), ); - Self { tables } + Self { + tables, + options: Default::default(), + } } } @@ -121,10 +126,7 @@ impl ContextProvider for MySchemaProvider { None } - fn get_config_option( - &self, - _variable: &str, - ) -> Option { - None + fn options(&self) -> &ConfigOptions { + &self.options } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index fe08a0bfc4d3..255b50ca56dc 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -33,6 +33,7 @@ use sqlparser::ast::{ObjectType, OrderByExpr, Statement}; use sqlparser::ast::{TimezoneInfo, WildcardAdditionalOptions}; use sqlparser::parser::ParserError::ParserError; +use datafusion_common::config::ConfigOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::ToDFSchema; use datafusion_common::{ @@ -82,8 +83,9 @@ pub trait ContextProvider { fn get_aggregate_meta(&self, name: &str) -> Option>; /// Getter for system/user-defined variable type fn get_variable_type(&self, variable_names: &[String]) -> Option; - /// Getter for config_options - fn get_config_option(&self, variable: &str) -> Option; + + /// Get configuration options + fn options(&self) -> &ConfigOptions; } /// SQL parser options @@ -1814,22 +1816,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Timestamp With Time Zone // INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone // OUTPUT: [ArrowDataType] Timestamp - match self - .schema_provider - .get_config_option("datafusion.execution.time_zone") - { - Some(ScalarValue::Utf8(s)) => s, - Some(v) => { - return Err(DataFusionError::Internal(format!( - "Incorrect data type for time_zone: {}", - v.get_datatype(), - ))); - } - None => return Err(DataFusionError::Internal( - "Config Option datafusion.execution.time_zone doesn't exist" - .to_string(), - )), - } + self.schema_provider.options().execution.time_zone.clone() } else { // Timestamp Without Time zone None @@ -4163,6 +4150,7 @@ mod tests { #[derive(Default)] struct MockContextProvider { + options: ConfigOptions, udafs: HashMap>, } @@ -4258,8 +4246,8 @@ mod tests { unimplemented!() } - fn get_config_option(&self, _: &str) -> Option { - unimplemented!() + fn options(&self) -> &ConfigOptions { + &self.options } }