Skip to content

Commit

Permalink
Move ConfigOptions to core (#4803)
Browse files Browse the repository at this point in the history
* Move ConfigOptions to core

* Fix defaults

* Toml format
  • Loading branch information
tustvold authored Jan 4, 2023
1 parent ae1465d commit e1dc962
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 109 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{
Expand All @@ -37,7 +38,7 @@ pub fn main() -> Result<()> {
let statements = Parser::parse_sql(&dialect, sql)?;

// produce a logical plan using the datafusion-sql crate
let context_provider = MyContextProvider {};
let context_provider = MyContextProvider::default();
let sql_to_rel = SqlToRel::new(&context_provider);
let logical_plan = sql_to_rel.sql_statement_to_plan(statements[0].clone())?;
println!(
Expand Down Expand Up @@ -120,7 +121,10 @@ impl ExprRewriter for MyExprRewriter {
}
}

struct MyContextProvider {}
#[derive(Default)]
struct MyContextProvider {
options: ConfigOptions,
}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
Expand Down Expand Up @@ -148,11 +152,8 @@ impl ContextProvider for MyContextProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ apache-avro = { version = "0.14", default-features = false, features = ["snappy"
arrow = { version = "29.0.0", default-features = false }
chrono = { version = "0.4", default-features = false }
cranelift-module = { version = "0.89.0", optional = true }
num_cpus = "1.13.0"
object_store = { version = "0.5.0", default-features = false, optional = true }
parquet = { version = "29.0.0", default-features = false, optional = true }
pyo3 = { version = "0.17.1", optional = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! DataFusion Configuration Options

use datafusion_common::{DataFusionError, Result};
use crate::{DataFusionError, Result};
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Display;
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
pub mod bisect;
pub mod cast;
mod column;
pub mod config;
pub mod delta;
mod dfschema;
mod error;
Expand Down
32 changes: 4 additions & 28 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,18 +1757,8 @@ impl ContextProvider for SessionState {
.and_then(|provider| provider.get(&provider_type)?.get_type(variable_names))
}

fn get_config_option(&self, variable: &str) -> Option<ScalarValue> {
// TOOD: Move ConfigOptions into common crate
match variable {
"datafusion.execution.time_zone" => self
.config
.options
.execution
.time_zone
.as_ref()
.map(|s| ScalarValue::Utf8(Some(s.clone()))),
_ => unimplemented!(),
}
fn options(&self) -> &ConfigOptions {
self.config_options()
}
}

Expand Down Expand Up @@ -1803,22 +1793,8 @@ impl OptimizerConfig for SessionState {
self.execution_props.query_execution_start_time
}

fn rule_enabled(&self, name: &str) -> bool {
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
match name {
FilterNullJoinKeys::NAME => {
self.config_options().optimizer.filter_null_join_keys
}
_ => true,
}
}

fn skip_failing_rules(&self) -> bool {
self.config_options().optimizer.skip_failed_rules
}

fn max_passes(&self) -> u8 {
self.config_options().optimizer.max_passes as _
fn options(&self) -> &ConfigOptions {
self.config_options()
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ extern crate sqlparser;

pub mod avro_to_arrow;
pub mod catalog;
pub mod config;
pub mod dataframe;
pub mod datasource;
pub mod error;
Expand All @@ -234,6 +233,7 @@ pub use parquet;

// re-export DataFusion crates
pub use datafusion_common as common;
pub use datafusion_common::config;
pub use datafusion_expr as logical_expr;
pub use datafusion_optimizer as optimizer;
pub use datafusion_physical_expr as physical_expr;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/sqllogictests/src/insert/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use arrow::datatypes::DataType;
use datafusion_common::{ScalarValue, TableReference};
use datafusion_common::config::ConfigOptions;
use datafusion_common::TableReference;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource};
use datafusion_sql::planner::ContextProvider;
use std::sync::Arc;
Expand Down Expand Up @@ -44,7 +45,7 @@ impl ContextProvider for LogicTestContextProvider {
todo!()
}

fn get_config_option(&self, _variable: &str) -> Option<ScalarValue> {
fn options(&self) -> &ConfigOptions {
todo!()
}
}
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ impl OptimizerRule for FilterNullJoinKeys {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if !config.options().optimizer.filter_null_join_keys {
return Ok(None);
}

match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
let mut join = join.clone();
Expand Down
52 changes: 16 additions & 36 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::type_coercion::TypeCoercion;
use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
use chrono::{DateTime, Utc};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use log::{debug, trace, warn};
Expand Down Expand Up @@ -74,14 +75,7 @@ pub trait OptimizerConfig {
/// time is used as the value for now()
fn query_execution_start_time(&self) -> DateTime<Utc>;

/// Returns false if the given rule should be skipped
fn rule_enabled(&self, name: &str) -> bool;

/// The optimizer will skip failing rules if this returns true
fn skip_failing_rules(&self) -> bool;

/// How many times to attempt to optimize the plan
fn max_passes(&self) -> u8;
fn options(&self) -> &ConfigOptions;
}

/// A standalone [`OptimizerConfig`] that can be used independently
Expand All @@ -91,28 +85,25 @@ pub struct OptimizerContext {
/// Query execution start time that can be used to rewrite
/// expressions such as `now()` to use a literal value instead
query_execution_start_time: DateTime<Utc>,
/// Option to skip rules that produce errors
skip_failing_rules: bool,
/// Specify whether to enable the filter_null_keys rule
filter_null_keys: bool,
/// Maximum number of times to run optimizer against a plan
max_passes: u8,

options: ConfigOptions,
}

impl OptimizerContext {
/// Create optimizer config
pub fn new() -> Self {
let mut options = ConfigOptions::default();
options.optimizer.filter_null_join_keys = true;

Self {
query_execution_start_time: Utc::now(),
skip_failing_rules: true,
filter_null_keys: true,
max_passes: 3,
options,
}
}

/// Specify whether to enable the filter_null_keys rule
pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self {
self.filter_null_keys = filter_null_keys;
self.options.optimizer.filter_null_join_keys = filter_null_keys;
self
}

Expand All @@ -129,13 +120,13 @@ impl OptimizerContext {
/// Specify whether the optimizer should skip rules that produce
/// errors, or fail the query
pub fn with_skip_failing_rules(mut self, b: bool) -> Self {
self.skip_failing_rules = b;
self.options.optimizer.skip_failed_rules = b;
self
}

/// Specify how many times to attempt to optimize the plan
pub fn with_max_passes(mut self, v: u8) -> Self {
self.max_passes = v;
self.options.optimizer.max_passes = v as usize;
self
}
}
Expand All @@ -152,16 +143,8 @@ impl OptimizerConfig for OptimizerContext {
self.query_execution_start_time
}

fn rule_enabled(&self, name: &str) -> bool {
self.filter_null_keys || name != FilterNullJoinKeys::NAME
}

fn skip_failing_rules(&self) -> bool {
self.skip_failing_rules
}

fn max_passes(&self) -> u8 {
self.max_passes
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down Expand Up @@ -271,18 +254,15 @@ impl Optimizer {
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let options = config.options();
let start_time = Instant::now();
let mut plan_str = format!("{}", plan.display_indent());
let mut new_plan = plan.clone();
let mut i = 0;
while i < config.max_passes() {
while i < options.optimizer.max_passes {
log_plan(&format!("Optimizer input (pass {i})"), &new_plan);

for rule in &self.rules {
if !config.rule_enabled(rule.name()) {
debug!("Skipping rule {} due to optimizer config", rule.name());
continue;
}
let result = self.optimize_recursively(rule, &new_plan, config);

match result {
Expand All @@ -308,7 +288,7 @@ impl Optimizer {
);
}
Err(ref e) => {
if config.skip_failing_rules() {
if options.optimizer.skip_failed_rules {
// Note to future readers: if you see this warning it signals a
// bug in the DataFusion optimizer. Please consider filing a ticket
// https://github.com/apache/arrow-datafusion
Expand Down
15 changes: 8 additions & 7 deletions datafusion/optimizer/tests/integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion_optimizer::optimizer::Optimizer;
Expand Down Expand Up @@ -323,7 +324,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let statement = &ast[0];

// create a logical query plan
let schema_provider = MySchemaProvider {};
let schema_provider = MySchemaProvider::default();
let sql_to_rel = SqlToRel::new(&schema_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

Expand All @@ -338,7 +339,10 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
optimizer.optimize(&plan, &config, &observe)
}

struct MySchemaProvider {}
#[derive(Default)]
struct MySchemaProvider {
options: ConfigOptions,
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(
Expand Down Expand Up @@ -390,11 +394,8 @@ impl ContextProvider for MySchemaProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}

Expand Down
14 changes: 8 additions & 6 deletions datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use arrow_schema::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
logical_plan::builder::LogicalTableSource, AggregateUDF, ScalarUDF, TableSource,
Expand Down Expand Up @@ -56,6 +57,7 @@ fn main() {
}

struct MySchemaProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

Expand Down Expand Up @@ -88,7 +90,10 @@ impl MySchemaProvider {
Field::new("price", DataType::Decimal128(10, 2), false),
]),
);
Self { tables }
Self {
tables,
options: Default::default(),
}
}
}

Expand Down Expand Up @@ -121,10 +126,7 @@ impl ContextProvider for MySchemaProvider {
None
}

fn get_config_option(
&self,
_variable: &str,
) -> Option<datafusion_common::ScalarValue> {
None
fn options(&self) -> &ConfigOptions {
&self.options
}
}
Loading

0 comments on commit e1dc962

Please sign in to comment.