Skip to content

Commit

Permalink
Optimize replace_params_with_values (#13308)
Browse files Browse the repository at this point in the history
* Add benchmark for `with_param_values`

Target of this benchmark: control that placeholders replacing does not get slower,
if the query does not contain placeholders at all.

* Do not save original expression name if it does not contain placeholders

Prior this patch, the original expression name was unconditionally preserved during the replacement of placeholders.
However, in some cases, this is unnecessary. If the expression does not contain any placeholders, its name remains unchanged, and we do not need to preserve it.

This patch adds the following optimization:

- If during first pass (placeholders type inference) was investigated that there are no placeholders, do not save the original name,
  do not apply second pass (replacing).
  Just leave the expression as it is and return `Transformed::no(...)`.
  • Loading branch information
askalt authored Nov 10, 2024
1 parent 31d27c2 commit 36e3415
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 7 deletions.
29 changes: 29 additions & 0 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ mod data_utils;

use crate::criterion::Criterion;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
Expand Down Expand Up @@ -122,6 +124,29 @@ fn register_clickbench_hits_table() -> SessionContext {
ctx
}

/// Target of this benchmark: control that placeholders replacing does not get slower,
/// if the query does not contain placeholders at all.
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {
const COLUMNS_NUM: usize = 200;
let mut aggregates = String::new();
for i in 0..COLUMNS_NUM {
if i > 0 {
aggregates.push_str(", ");
}
aggregates.push_str(format!("MAX(a{})", i).as_str());
}
// SELECT max(attr0), ..., max(attrN) FROM t1.
let query = format!("SELECT {} FROM t1", aggregates);
let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
let rt = Runtime::new().unwrap();
let plan =
rt.block_on(async { ctx.state().statement_to_plan(statement).await.unwrap() });
b.iter(|| {
let plan = plan.clone();
criterion::black_box(plan.with_param_values(vec![ScalarValue::from(1)]).unwrap());
});
}

fn criterion_benchmark(c: &mut Criterion) {
// verify that we can load the clickbench data prior to running the benchmark
if !PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists()
Expand Down Expand Up @@ -388,6 +413,10 @@ fn criterion_benchmark(c: &mut Criterion) {
}
})
});

c.bench_function("with_param_values_many_columns", |b| {
benchmark_with_param_values_many_columns(&ctx, b);
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
10 changes: 9 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,11 @@ impl Expr {
///
/// For example, gicen an expression like `<int32> = $0` will infer `$0` to
/// have type `int32`.
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<Expr> {
///
/// Returns transformed expression and flag that is true if expression contains
/// at least one placeholder.
pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
let mut has_placeholder = false;
self.transform(|mut expr| {
// Default to assuming the arguments are the same type
if let Expr::BinaryExpr(BinaryExpr { left, op: _, right }) = &mut expr {
Expand All @@ -1615,9 +1619,13 @@ impl Expr {
rewrite_placeholder(low.as_mut(), expr.as_ref(), schema)?;
rewrite_placeholder(high.as_mut(), expr.as_ref(), schema)?;
}
if let Expr::Placeholder(_) = &expr {
has_placeholder = true;
}
Ok(Transformed::yes(expr))
})
.data()
.map(|data| (data, has_placeholder))
}

/// Returns true if some of this `exprs` subexpressions may not be evaluated
Expand Down
17 changes: 12 additions & 5 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1413,18 +1413,25 @@ impl LogicalPlan {
let schema = Arc::clone(plan.schema());
let name_preserver = NamePreserver::new(&plan);
plan.map_expressions(|e| {
let original_name = name_preserver.save(&e);
let transformed_expr =
e.infer_placeholder_types(&schema)?.transform_up(|e| {
let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
if !has_placeholder {
// Performance optimization:
// avoid NamePreserver copy and second pass over expression
// if no placeholders.
Ok(Transformed::no(e))
} else {
let original_name = name_preserver.save(&e);
let transformed_expr = e.transform_up(|e| {
if let Expr::Placeholder(Placeholder { id, .. }) = e {
let value = param_values.get_placeholders_with_values(&id)?;
Ok(Transformed::yes(Expr::Literal(value)))
} else {
Ok(Transformed::no(e))
}
})?;
// Preserve name to avoid breaking column references to this expression
Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
// Preserve name to avoid breaking column references to this expression
Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
}
})
})
.map(|res| res.data)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
expr = self.rewrite_partial_qualifier(expr, schema);
self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
let expr = expr.infer_placeholder_types(schema)?;
let (expr, _) = expr.infer_placeholder_types(schema)?;
Ok(expr)
}

Expand Down

0 comments on commit 36e3415

Please sign in to comment.