Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show optimization errors in explain #4819

Merged
merged 4 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
optimizer::PhysicalOptimizerRule,
},
};
use datafusion_expr::DescribeTable;
use datafusion_expr::{DescribeTable, StringifiedPlan};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
Expand Down Expand Up @@ -1826,21 +1826,31 @@ impl SessionState {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
e.plan.as_ref(),
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
},
)?;
) {
Ok(plan) => (Arc::new(plan), true),
Err(DataFusionError::Context(optimizer_name, err)) => {
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));
(e.plan.clone(), false)
}
Err(e) => return Err(e),
};

Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: Arc::new(plan),
plan,
stringified_plans,
schema: e.schema.clone(),
logical_optimization_succeeded,
}))
} else {
self.optimizer.optimize(plan, self, |_, _| {})
Expand Down
79 changes: 53 additions & 26 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ use datafusion_expr::expr::{
Like, TryCast, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{logical_plan, StringifiedPlan};
use datafusion_expr::{WindowFrame, WindowFrameBound};
use datafusion_optimizer::utils::unalias;
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -1740,28 +1740,47 @@ impl DefaultPhysicalPlanner {

if !config.physical_plan_only {
stringified_plans = e.stringified_plans.clone();
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
if e.logical_optimization_succeeded {
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
}
}

if !config.logical_plan_only {
let input = self
if !config.logical_plan_only && e.logical_optimization_succeeded {
match self
.create_initial_plan(e.plan.as_ref(), session_state)
.await?;

stringified_plans.push(
displayable(input.as_ref()).to_stringified(InitialPhysicalPlan),
);

let input =
self.optimize_internal(input, session_state, |plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(displayable(plan).to_stringified(plan_type));
})?;
.await
{
Ok(input) => {
stringified_plans.push(
displayable(input.as_ref())
.to_stringified(InitialPhysicalPlan),
);

stringified_plans
.push(displayable(input.as_ref()).to_stringified(FinalPhysicalPlan));
match self.optimize_internal(
input,
session_state,
|plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(displayable(plan).to_stringified(plan_type));
},
) {
Ok(input) => stringified_plans.push(
displayable(input.as_ref())
.to_stringified(FinalPhysicalPlan),
),
Err(DataFusionError::Context(optimizer_name, e)) => {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, e.to_string()))
}
Err(e) => return Err(e),
}
}
Err(e) => stringified_plans
.push(StringifiedPlan::new(InitialPhysicalPlan, e.to_string())),
}
}

Ok(Some(Arc::new(ExplainExec::new(
Expand Down Expand Up @@ -1795,14 +1814,22 @@ impl DefaultPhysicalPlanner {
let mut new_plan = plan;
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer.optimize(new_plan, session_state.config_options())?;
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
return Err(DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
)));
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
));
return Err(DataFusionError::Context(
optimizer.name().to_string(),
Box::new(e),
));
}
trace!(
"Optimized physical plan by {}:\n{}\n",
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ impl LogicalPlanBuilder {
plan: Arc::new(self.plan),
stringified_plans,
schema,
logical_optimization_succeeded: false,
})))
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,8 @@ pub struct Explain {
pub stringified_plans: Vec<StringifiedPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
/// Used by physical planner to check if should proceed with planning
pub logical_optimization_succeeded: bool,
}

/// Runs the actual plan, and then prints the physical plan with
Expand Down
22 changes: 16 additions & 6 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,16 @@ impl Optimizer {
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
return Err(DataFusionError::Internal(format!(
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule.name(),
new_plan.schema(),
plan.schema()
)));
));
return Err(DataFusionError::Context(
rule.name().to_string(),
Box::new(e),
));
}
new_plan = plan;
observer(&new_plan, rule.as_ref());
Expand All @@ -298,11 +302,15 @@ impl Optimizer {
e
);
} else {
return Err(DataFusionError::Internal(format!(
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed due to unexpected error: {}",
rule.name(),
e
)));
));
return Err(DataFusionError::Context(
rule.name().to_string(),
Box::new(e),
));
}
}
}
Expand Down Expand Up @@ -436,7 +444,8 @@ mod tests {
});
let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \
"bad rule\ncaused by\n\
Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \
Error during planning: rule failed. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
err.to_string()
Expand All @@ -453,7 +462,8 @@ mod tests {
});
let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \
"get table_scan rule\ncaused by\n\
Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \
original schema: DFSchema { fields: [], metadata: {} }, \
new schema: DFSchema { fields: [\
DFField { qualifier: Some(\"test\"), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan,
stringified_plans,
schema,
logical_optimization_succeeded: false,
}))
}
}
Expand Down