Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ConfigOptions to core #4803

Merged
merged 5 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{
Expand All @@ -37,7 +38,7 @@ pub fn main() -> Result<()> {
let statements = Parser::parse_sql(&dialect, sql)?;

// produce a logical plan using the datafusion-sql crate
let context_provider = MyContextProvider {};
let context_provider = MyContextProvider::default();
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
println!(
Expand Down Expand Up @@ -120,7 +121,10 @@ impl ExprRewriter for MyExprRewriter {
}
}

struct MyContextProvider {}
#[derive(Default)]
struct MyContextProvider {
options: ConfigOptions,
}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
Expand Down Expand Up @@ -148,11 +152,8 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apache-avro = { version = "0.14", default-features = false, features = ["snappy"
arrow = { version = "29.0.0", default-features = false }
chrono = { version = "0.4", default-features = false }
cranelift-module = { version = "0.89.0", optional = true }
num_cpus = "1.13.0"
object_store = { version = "0.5.0", default-features = false, optional = true }
parquet = { version = "29.0.0", default-features = false, optional = true }
pyo3 = { version = "0.17.1", optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFusion Configuration Options

use datafusion_common::{DataFusionError, Result};
use crate::{DataFusionError, Result};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Display;
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
pub mod bisect;
pub mod cast;
mod column;
pub mod config;
pub mod delta;
mod dfschema;
mod error;
Expand Down
32 changes: 4 additions & 28 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,18 +1757,8 @@ impl ContextProvider for SessionState {
.and_then(|provider| provider.get(&provider_type)?.get_type(variable_names))
}

fn get_config_option(&self, variable: &str) -> Option<ScalarValue> {
// TOOD: Move ConfigOptions into common crate
match variable {
"datafusion.execution.time_zone" => self
.config
.options
.execution
.time_zone
.as_ref()
.map(|s| ScalarValue::Utf8(Some(s.clone()))),
_ => unimplemented!(),
}
fn options(&self) -> &ConfigOptions {
self.config_options()
}
}

Expand Down Expand Up @@ -1803,22 +1793,8 @@ impl OptimizerConfig for SessionState {
self.execution_props.query_execution_start_time
}

fn rule_enabled(&self, name: &str) -> bool {
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
match name {
FilterNullJoinKeys::NAME => {
self.config_options().optimizer.filter_null_join_keys
}
_ => true,
}
}

fn skip_failing_rules(&self) -> bool {
self.config_options().optimizer.skip_failed_rules
}

fn max_passes(&self) -> u8 {
self.config_options().optimizer.max_passes as _
fn options(&self) -> &ConfigOptions {
self.config_options()
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ extern crate sqlparser;

pub mod avro_to_arrow;
pub mod catalog;
pub mod config;
pub mod dataframe;
pub mod datasource;
pub mod error;
Expand All @@ -234,6 +233,7 @@ pub use parquet;

// re-export DataFusion crates
pub use datafusion_common as common;
pub use datafusion_common::config;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/sqllogictests/src/insert/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use arrow::datatypes::DataType;
use datafusion_common::{ScalarValue, TableReference};
use datafusion_common::config::ConfigOptions;
use datafusion_common::TableReference;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};
use datafusion_sql::planner::ContextProvider;
use std::sync::Arc;
Expand Down Expand Up @@ -44,7 +45,7 @@ impl ContextProvider for LogicTestContextProvider {
todo!()
}

fn get_config_option(&self, _variable: &str) -> Option<ScalarValue> {
fn options(&self) -> &ConfigOptions {
todo!()
}
}
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ impl OptimizerRule for FilterNullJoinKeys {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if !config.options().optimizer.filter_null_join_keys {
return Ok(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is worth a debug! here if this is skipped

}

match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
let mut join = join.clone();
Expand Down
52 changes: 16 additions & 36 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::type_coercion::TypeCoercion;
use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
use chrono::{DateTime, Utc};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
Expand Down Expand Up @@ -74,14 +75,7 @@ pub trait OptimizerConfig {
/// time is used as the value for now()
fn query_execution_start_time(&self) -> DateTime<Utc>;

/// Returns false if the given rule should be skipped
fn rule_enabled(&self, name: &str) -> bool;

/// The optimizer will skip failing rules if this returns true
fn skip_failing_rules(&self) -> bool;

/// How many times to attempt to optimize the plan
fn max_passes(&self) -> u8;
fn options(&self) -> &ConfigOptions;
}

/// A standalone [`OptimizerConfig`] that can be used independently
Expand All @@ -91,28 +85,25 @@ pub struct OptimizerContext {
/// Query execution start time that can be used to rewrite
/// expressions such as `now()` to use a literal value instead
query_execution_start_time: DateTime<Utc>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice

/// Option to skip rules that produce errors
skip_failing_rules: bool,
/// Specify whether to enable the filter_null_keys rule
filter_null_keys: bool,
/// Maximum number of times to run optimizer against a plan
max_passes: u8,

options: ConfigOptions,
}

impl OptimizerContext {
/// Create optimizer config
pub fn new() -> Self {
let mut options = ConfigOptions::default();
options.optimizer.filter_null_join_keys = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double-checked this wasn't introduced by the recent config rework, and confirmed that it has always been the case that OptimizerContext defaults this to true, but ConfigOptions defaults it to false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention was always to have this default to false so this looks like a bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizerContext is now only used within tests, and so I am going to leave this for now. If you feel strongly I can file a follow up PR to change this, but it creates a non-trivial amount of test-churn


Self {
query_execution_start_time: Utc::now(),
skip_failing_rules: true,
filter_null_keys: true,
max_passes: 3,
options,
}
}

/// Specify whether to enable the filter_null_keys rule
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
self.filter_null_keys = filter_null_keys;
self.options.optimizer.filter_null_join_keys = filter_null_keys;
self
}

Expand All @@ -129,13 +120,13 @@ impl OptimizerContext {
/// Specify whether the optimizer should skip rules that produce
/// errors, or fail the query
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
self.skip_failing_rules = b;
self.options.optimizer.skip_failed_rules = b;
self
}

/// Specify how many times to attempt to optimize the plan
pub fn with_max_passes(mut self, v: u8) -> Self {
self.max_passes = v;
self.options.optimizer.max_passes = v as usize;
self
}
}
Expand All @@ -152,16 +143,8 @@ impl OptimizerConfig for OptimizerContext {
self.query_execution_start_time
}

fn rule_enabled(&self, name: &str) -> bool {
self.filter_null_keys || name != FilterNullJoinKeys::NAME
}

fn skip_failing_rules(&self) -> bool {
self.skip_failing_rules
}

fn max_passes(&self) -> u8 {
self.max_passes
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down Expand Up @@ -271,18 +254,15 @@ impl Optimizer {
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let options = config.options();
let start_time = Instant::now();
let mut plan_str = format!("{}", plan.display_indent());
let mut new_plan = plan.clone();
let mut i = 0;
while i < config.max_passes() {
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);

for rule in &self.rules {
if !config.rule_enabled(rule.name()) {
debug!("Skipping rule {} due to optimizer config", rule.name());
continue;
}
let result = self.optimize_recursively(rule, &new_plan, config);

match result {
Expand All @@ -308,7 +288,7 @@ impl Optimizer {
);
}
Err(ref e) => {
if config.skip_failing_rules() {
if options.optimizer.skip_failed_rules {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/arrow-datafusion
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/tests/integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
Expand Down Expand Up @@ -323,7 +324,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let statement = &ast[0];

// create a logical query plan
let schema_provider = MySchemaProvider {};
let schema_provider = MySchemaProvider::default();
let sql_to_rel = SqlToRel::new(&schema_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

Expand All @@ -338,7 +339,10 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
optimizer.optimize(&plan, &config, &observe)
}

struct MySchemaProvider {}
#[derive(Default)]
struct MySchemaProvider {
options: ConfigOptions,
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(
Expand Down Expand Up @@ -390,11 +394,8 @@ impl ContextProvider for MySchemaProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down
14 changes: 8 additions & 6 deletions datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource,
Expand Down Expand Up @@ -56,6 +57,7 @@ fn main() {
}

struct MySchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

Expand Down Expand Up @@ -88,7 +90,10 @@ impl MySchemaProvider {
Field::new("price", DataType::Decimal128(10, 2), false),
]),
);
Self { tables }
Self {
tables,
options: Default::default(),
}
}
}

Expand Down Expand Up @@ -121,10 +126,7 @@ impl ContextProvider for MySchemaProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}
Loading