From 2198ec1dac6f4bb23014baa714eb3173276a4ba2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 00:26:20 +0200 Subject: [PATCH 01/12] Optimize count(*) with table statistics --- datafusion/src/execution/context.rs | 4 +++- datafusion/src/optimizer/mod.rs | 1 + datafusion/src/optimizer/utils.rs | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 165263084cc7..e35e72493c42 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -22,7 +22,8 @@ use crate::{ information_schema::CatalogWithInformationSchema, }, optimizer::{ - eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder, + count_statistics::StatisticsConstant, eliminate_limit::EliminateLimit, + hash_build_probe_order::HashBuildProbeOrder, }, physical_optimizer::optimizer::PhysicalOptimizerRule, }; @@ -639,6 +640,7 @@ impl Default for ExecutionConfig { Arc::new(SimplifyExpressions::new()), Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), + Arc::new(StatisticsConstant::new()), ], physical_optimizers: vec![ Arc::new(CoalesceBatches::new()), diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs index e360a54f2a96..5700a2f9c85f 100644 --- a/datafusion/src/optimizer/mod.rs +++ b/datafusion/src/optimizer/mod.rs @@ -19,6 +19,7 @@ //! some simple rules to a logical plan, such as "Projection Push Down" and "Type Coercion". pub mod constant_folding; +pub mod count_statistics; pub mod eliminate_limit; pub mod filter_push_down; pub mod hash_build_probe_order; diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 76f44b84657c..861002f6aa59 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -577,3 +577,5 @@ mod tests { Ok(()) } } + + From d8e65dbfd819389a08ebe5d039142ee9646d9707 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 00:40:48 +0200 Subject: [PATCH 02/12] Optimize count(*) with table statistics --- datafusion/src/execution/context.rs | 2 +- datafusion/src/optimizer/count_statistics.rs | 175 +++++++++++++++++++ 2 files changed, 176 insertions(+), 1 deletion(-) create mode 100644 datafusion/src/optimizer/count_statistics.rs diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index e35e72493c42..8f85b78dff2a 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -635,12 +635,12 @@ impl Default for ExecutionConfig { optimizers: vec![ Arc::new(ConstantFolding::new()), Arc::new(EliminateLimit::new()), + Arc::new(StatisticsConstant::new()), Arc::new(ProjectionPushDown::new()), Arc::new(FilterPushDown::new()), Arc::new(SimplifyExpressions::new()), Arc::new(HashBuildProbeOrder::new()), Arc::new(LimitPushDown::new()), - Arc::new(StatisticsConstant::new()), ], physical_optimizers: vec![ Arc::new(CoalesceBatches::new()), diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs new file mode 100644 index 000000000000..16fb8a1ded10 --- /dev/null +++ b/datafusion/src/optimizer/count_statistics.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilizing exact statistics from sources to avoid scanning data +use crate::{ + execution::context::ExecutionProps, + logical_plan::{Expr, LogicalPlan}, + physical_plan::aggregates::AggregateFunction, + scalar::ScalarValue, +}; + +use super::{optimizer::OptimizerRule, utils}; +use crate::error::Result; + +/// Optimizer that uses available statistics for aggregate functions +pub struct StatisticsConstant {} + +impl StatisticsConstant { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for StatisticsConstant { + fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &ExecutionProps, + ) -> crate::error::Result { + match plan { + LogicalPlan::Aggregate { + input, + group_expr, + aggr_expr, + schema, + } if group_expr.len() == 0 && aggr_expr.len() == 1 => { + if let Some(num_rows) = match input.as_ref() { + LogicalPlan::TableScan { source, .. } => source.statistics().num_rows, + _ => None, + } { + let expr: Vec = aggr_expr + .iter() + .map(|e| match e { + Expr::AggregateFunction { + fun: AggregateFunction::Count, + args, + distinct: false, + } if args + == &[Expr::Literal(ScalarValue::UInt8(Some(1)))] => + { + Expr::Alias( + Box::new(Expr::Literal(ScalarValue::UInt64(Some( + num_rows as u64, + )))), + "#COUNT(Uint8(1))".to_string(), + ) + } + _ => e.clone(), + }) + .collect(); + + return Ok(LogicalPlan::Projection { + expr, + input: input.clone(), + schema: schema.clone(), + }); + } + + Ok(plan.clone()) + } + // Rest: recurse and find possible statistics + _ => { + let expr = plan.expressions(); + + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| self.optimize(plan, execution_props)) + .collect::>>()?; + + utils::from_plan(plan, &expr, &new_inputs) + } + } + } + + fn name(&self) -> &str { + "count_statistics" + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema}; + + use crate::error::Result; + use crate::logical_plan::LogicalPlan; + use crate::{ + datasource::{datasource::Statistics, TableProvider}, + logical_plan::Expr, + }; + + struct TestTableProvider { + num_rows: usize, + } + + impl TableProvider for TestTableProvider { + fn as_any(&self) -> &dyn std::any::Any { + unimplemented!() + } + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn scan( + &self, + _projection: &Option>, + _batch_size: usize, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!() + } + fn statistics(&self) -> crate::datasource::datasource::Statistics { + Statistics { + num_rows: Some(self.num_rows), + total_byte_size: None, + column_statistics: None, + } + } + } + + #[test] + fn optimize_count() -> Result<()> { + use crate::execution::context::ExecutionContext; + let mut ctx = ExecutionContext::new(); + ctx.register_table("test", Arc::new(TestTableProvider { num_rows: 100 })) + .unwrap(); + + let plan = ctx + .sql("select count(*) from test") + .unwrap() + .to_logical_plan(); + let expected = "\ + Projection: #COUNT(UInt8(1))\ + \n Projection: UInt64(100) AS #COUNT(Uint8(1))\ + \n TableScan: test projection=Some([])"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { + let formatted_plan = format!("{:?}", plan); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), plan.schema()); + } +} From fe96d1a401e785431a83a604662a0ca54ee4aa3d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 12:10:05 +0200 Subject: [PATCH 03/12] Fixes, simplification --- datafusion/src/optimizer/count_statistics.rs | 47 ++++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs index 16fb8a1ded10..e722fbdaa060 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/count_statistics.rs @@ -16,9 +16,11 @@ // under the License. //! Utilizing exact statistics from sources to avoid scanning data +use std::sync::Arc; + use crate::{ execution::context::ExecutionProps, - logical_plan::{Expr, LogicalPlan}, + logical_plan::{DFSchema, Expr, LogicalPlan}, physical_plan::aggregates::AggregateFunction, scalar::ScalarValue, }; @@ -43,6 +45,7 @@ impl OptimizerRule for StatisticsConstant { execution_props: &ExecutionProps, ) -> crate::error::Result { match plan { + // match only select count(*) from table_scan LogicalPlan::Aggregate { input, group_expr, @@ -53,32 +56,28 @@ impl OptimizerRule for StatisticsConstant { LogicalPlan::TableScan { source, .. } => source.statistics().num_rows, _ => None, } { - let expr: Vec = aggr_expr - .iter() - .map(|e| match e { - Expr::AggregateFunction { - fun: AggregateFunction::Count, - args, - distinct: false, - } if args - == &[Expr::Literal(ScalarValue::UInt8(Some(1)))] => - { - Expr::Alias( + return match &aggr_expr[0] { + Expr::AggregateFunction { + fun: AggregateFunction::Count, + args, + distinct: false, + } if args == &[Expr::Literal(ScalarValue::UInt8(Some(1)))] => { + Ok(LogicalPlan::Projection { + expr: vec![Expr::Alias( Box::new(Expr::Literal(ScalarValue::UInt64(Some( num_rows as u64, )))), "#COUNT(Uint8(1))".to_string(), - ) - } - _ => e.clone(), - }) - .collect(); - - return Ok(LogicalPlan::Projection { - expr, - input: input.clone(), - schema: schema.clone(), - }); + )], + input: Arc::new(LogicalPlan::EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + }), + schema: schema.clone(), + }) + } + _ => Ok(plan.clone()), + }; } Ok(plan.clone()) @@ -161,7 +160,7 @@ mod tests { let expected = "\ Projection: #COUNT(UInt8(1))\ \n Projection: UInt64(100) AS #COUNT(Uint8(1))\ - \n TableScan: test projection=Some([])"; + \n EmptyRelation"; assert_optimized_plan_eq(&plan, expected); Ok(()) From 61db2336aa36e3132f5cd0a9c13e6b8ab227460f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 12:56:38 +0200 Subject: [PATCH 04/12] Alias fix --- datafusion/src/optimizer/count_statistics.rs | 4 ++-- datafusion/src/optimizer/utils.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs index e722fbdaa060..b0ec22d0d6c2 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/count_statistics.rs @@ -67,7 +67,7 @@ impl OptimizerRule for StatisticsConstant { Box::new(Expr::Literal(ScalarValue::UInt64(Some( num_rows as u64, )))), - "#COUNT(Uint8(1))".to_string(), + "COUNT(Uint8(1))".to_string(), )], input: Arc::new(LogicalPlan::EmptyRelation { produce_one_row: true, @@ -159,7 +159,7 @@ mod tests { .to_logical_plan(); let expected = "\ Projection: #COUNT(UInt8(1))\ - \n Projection: UInt64(100) AS #COUNT(Uint8(1))\ + \n Projection: UInt64(100) AS COUNT(Uint8(1))\ \n EmptyRelation"; assert_optimized_plan_eq(&plan, expected); diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 861002f6aa59..76f44b84657c 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -577,5 +577,3 @@ mod tests { Ok(()) } } - - From 3687361bbb2e89ef242ee37cd6def62d24cdfdb8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 15:11:02 +0200 Subject: [PATCH 05/12] Add member to table provider to return whether statistics are exact --- datafusion/src/datasource/datasource.rs | 5 +++++ datafusion/src/datasource/memory.rs | 4 ++++ datafusion/src/datasource/parquet.rs | 4 ++++ datafusion/src/optimizer/count_statistics.rs | 8 ++++++-- 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs index 0349a49e491b..b83aa4b1ab56 100644 --- a/datafusion/src/datasource/datasource.rs +++ b/datafusion/src/datasource/datasource.rs @@ -108,6 +108,11 @@ pub trait TableProvider: Sync + Send { /// Statistics should be optional because not all data sources can provide statistics. fn statistics(&self) -> Statistics; + /// Returns whether statistics provided are exact values or estimates + fn has_exact_statistics(&self) -> bool { + false + } + /// Tests whether the table provider can make use of a filter expression /// to optimise data retrieval. fn supports_filter_pushdown( diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs index af4048087028..a4dbfd6c4a24 100644 --- a/datafusion/src/datasource/memory.rs +++ b/datafusion/src/datasource/memory.rs @@ -216,6 +216,10 @@ impl TableProvider for MemTable { fn statistics(&self) -> Statistics { self.statistics.clone() } + + fn has_exact_statistics(&self) -> bool { + true + } } #[cfg(test)] diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index abfb81d99887..b4b71d5e53f4 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -101,6 +101,10 @@ impl TableProvider for ParquetTable { fn statistics(&self) -> Statistics { self.statistics.clone() } + + fn has_exact_statistics(&self) -> bool { + true + } } #[cfg(test)] diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs index b0ec22d0d6c2..4cee3c668b58 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/count_statistics.rs @@ -51,9 +51,13 @@ impl OptimizerRule for StatisticsConstant { group_expr, aggr_expr, schema, - } if group_expr.len() == 0 && aggr_expr.len() == 1 => { + } if group_expr.is_empty() && aggr_expr.is_empty() => { if let Some(num_rows) = match input.as_ref() { - LogicalPlan::TableScan { source, .. } => source.statistics().num_rows, + LogicalPlan::TableScan { source, .. } + if source.has_exact_statistics() => + { + source.statistics().num_rows + } _ => None, } { return match &aggr_expr[0] { From cc6b73abfec2721176fc4a191c79531c51db91a2 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 15:13:38 +0200 Subject: [PATCH 06/12] Fix --- datafusion/src/optimizer/count_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs index 4cee3c668b58..bdc735ce7cfe 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/count_statistics.rs @@ -51,7 +51,7 @@ impl OptimizerRule for StatisticsConstant { group_expr, aggr_expr, schema, - } if group_expr.is_empty() && aggr_expr.is_empty() => { + } if group_expr.is_empty() && aggr_expr.len() == 1 => { if let Some(num_rows) = match input.as_ref() { LogicalPlan::TableScan { source, .. } if source.has_exact_statistics() => From 9b2440d69d7ce02c38f766b6e68fa21ae23d3b9b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 26 Jun 2021 15:24:02 +0200 Subject: [PATCH 07/12] Improve test --- datafusion/src/optimizer/count_statistics.rs | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/count_statistics.rs index bdc735ce7cfe..760edafb1339 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/count_statistics.rs @@ -114,7 +114,10 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use crate::error::Result; + use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; + use crate::optimizer::count_statistics::StatisticsConstant; + use crate::optimizer::optimizer::OptimizerRule; use crate::{ datasource::{datasource::Statistics, TableProvider}, logical_plan::Expr, @@ -148,6 +151,9 @@ mod tests { column_statistics: None, } } + fn has_exact_statistics(&self) -> bool { + true + } } #[test] @@ -158,20 +164,21 @@ mod tests { .unwrap(); let plan = ctx - .sql("select count(*) from test") - .unwrap() - .to_logical_plan(); + .create_logical_plan("select count(*) from test") + .unwrap(); let expected = "\ - Projection: #COUNT(UInt8(1))\ - \n Projection: UInt64(100) AS COUNT(Uint8(1))\ - \n EmptyRelation"; + Projection: #COUNT(UInt8(1))\ + \n Projection: UInt64(100) AS COUNT(Uint8(1))\ + \n EmptyRelation"; assert_optimized_plan_eq(&plan, expected); Ok(()) } fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let formatted_plan = format!("{:?}", plan); + let opt = StatisticsConstant::new(); + let optimized_plan = opt.optimize(plan, &ExecutionProps::new()).unwrap(); + let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), plan.schema()); } From 1ac94631eeb1b7caa9ad9980589d3ff6786bc0d9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 27 Jun 2021 09:03:46 +0200 Subject: [PATCH 08/12] Naming changes --- datafusion/src/execution/context.rs | 4 ++-- .../{count_statistics.rs => aggregate_statistics.rs} | 12 ++++++------ datafusion/src/optimizer/mod.rs | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) rename datafusion/src/optimizer/{count_statistics.rs => aggregate_statistics.rs} (95%) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8f85b78dff2a..79872d897b73 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -22,7 +22,7 @@ use crate::{ information_schema::CatalogWithInformationSchema, }, optimizer::{ - count_statistics::StatisticsConstant, eliminate_limit::EliminateLimit, + aggregate_statistics::AggregateStatistics, eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder, }, physical_optimizer::optimizer::PhysicalOptimizerRule, @@ -635,7 +635,7 @@ impl Default for ExecutionConfig { optimizers: vec![ Arc::new(ConstantFolding::new()), Arc::new(EliminateLimit::new()), - Arc::new(StatisticsConstant::new()), + Arc::new(AggregateStatistics::new()), Arc::new(ProjectionPushDown::new()), Arc::new(FilterPushDown::new()), Arc::new(SimplifyExpressions::new()), diff --git a/datafusion/src/optimizer/count_statistics.rs b/datafusion/src/optimizer/aggregate_statistics.rs similarity index 95% rename from datafusion/src/optimizer/count_statistics.rs rename to datafusion/src/optimizer/aggregate_statistics.rs index 760edafb1339..1c59f0d7c13d 100644 --- a/datafusion/src/optimizer/count_statistics.rs +++ b/datafusion/src/optimizer/aggregate_statistics.rs @@ -29,16 +29,16 @@ use super::{optimizer::OptimizerRule, utils}; use crate::error::Result; /// Optimizer that uses available statistics for aggregate functions -pub struct StatisticsConstant {} +pub struct AggregateStatistics {} -impl StatisticsConstant { +impl AggregateStatistics { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -impl OptimizerRule for StatisticsConstant { +impl OptimizerRule for AggregateStatistics { fn optimize( &self, plan: &LogicalPlan, @@ -116,7 +116,7 @@ mod tests { use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; - use crate::optimizer::count_statistics::StatisticsConstant; + use crate::optimizer::aggregate_statistics::AggregateStatistics; use crate::optimizer::optimizer::OptimizerRule; use crate::{ datasource::{datasource::Statistics, TableProvider}, @@ -157,7 +157,7 @@ mod tests { } #[test] - fn optimize_count() -> Result<()> { + fn optimize_count_using_statistics() -> Result<()> { use crate::execution::context::ExecutionContext; let mut ctx = ExecutionContext::new(); ctx.register_table("test", Arc::new(TestTableProvider { num_rows: 100 })) @@ -176,7 +176,7 @@ mod tests { } fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let opt = StatisticsConstant::new(); + let opt = AggregateStatistics::new(); let optimized_plan = opt.optimize(plan, &ExecutionProps::new()).unwrap(); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs index 5700a2f9c85f..099a2572e2b4 100644 --- a/datafusion/src/optimizer/mod.rs +++ b/datafusion/src/optimizer/mod.rs @@ -19,7 +19,7 @@ //! some simple rules to a logical plan, such as "Projection Push Down" and "Type Coercion". pub mod constant_folding; -pub mod count_statistics; +pub mod aggregate_statistics; pub mod eliminate_limit; pub mod filter_push_down; pub mod hash_build_probe_order; From 12d356520bd857eec81b505c6cb2954e63d3d2c5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 27 Jun 2021 09:10:50 +0200 Subject: [PATCH 09/12] Add test for non-exact statistics --- .../src/optimizer/aggregate_statistics.rs | 38 +++++++++++++++++-- datafusion/src/optimizer/mod.rs | 2 +- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/datafusion/src/optimizer/aggregate_statistics.rs b/datafusion/src/optimizer/aggregate_statistics.rs index 1c59f0d7c13d..fb669c1b0cbc 100644 --- a/datafusion/src/optimizer/aggregate_statistics.rs +++ b/datafusion/src/optimizer/aggregate_statistics.rs @@ -125,6 +125,7 @@ mod tests { struct TestTableProvider { num_rows: usize, + is_exact: bool, } impl TableProvider for TestTableProvider { @@ -152,7 +153,7 @@ mod tests { } } fn has_exact_statistics(&self) -> bool { - true + self.is_exact } } @@ -160,8 +161,14 @@ mod tests { fn optimize_count_using_statistics() -> Result<()> { use crate::execution::context::ExecutionContext; let mut ctx = ExecutionContext::new(); - ctx.register_table("test", Arc::new(TestTableProvider { num_rows: 100 })) - .unwrap(); + ctx.register_table( + "test", + Arc::new(TestTableProvider { + num_rows: 100, + is_exact: true, + }), + ) + .unwrap(); let plan = ctx .create_logical_plan("select count(*) from test") @@ -175,6 +182,31 @@ mod tests { Ok(()) } + #[test] + fn optimize_count_not_exact() -> Result<()> { + use crate::execution::context::ExecutionContext; + let mut ctx = ExecutionContext::new(); + ctx.register_table( + "test", + Arc::new(TestTableProvider { + num_rows: 100, + is_exact: false, + }), + ) + .unwrap(); + + let plan = ctx + .create_logical_plan("select count(*) from test") + .unwrap(); + let expected = "\ + Projection: #COUNT(UInt8(1))\ + \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let opt = AggregateStatistics::new(); let optimized_plan = opt.optimize(plan, &ExecutionProps::new()).unwrap(); diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs index 099a2572e2b4..68758474d594 100644 --- a/datafusion/src/optimizer/mod.rs +++ b/datafusion/src/optimizer/mod.rs @@ -18,8 +18,8 @@ //! This module contains a query optimizer that operates against a logical plan and applies //! some simple rules to a logical plan, such as "Projection Push Down" and "Type Coercion". -pub mod constant_folding; pub mod aggregate_statistics; +pub mod constant_folding; pub mod eliminate_limit; pub mod filter_push_down; pub mod hash_build_probe_order; From 8474b5a9deb19336fc5f9f0f3f52a542fd0e9e48 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 27 Jun 2021 22:49:44 +0200 Subject: [PATCH 10/12] Generalize solution --- .../src/optimizer/aggregate_statistics.rs | 109 ++++++++++++++---- 1 file changed, 88 insertions(+), 21 deletions(-) diff --git a/datafusion/src/optimizer/aggregate_statistics.rs b/datafusion/src/optimizer/aggregate_statistics.rs index fb669c1b0cbc..164643dfc03a 100644 --- a/datafusion/src/optimizer/aggregate_statistics.rs +++ b/datafusion/src/optimizer/aggregate_statistics.rs @@ -16,11 +16,11 @@ // under the License. //! Utilizing exact statistics from sources to avoid scanning data -use std::sync::Arc; +use std::{sync::Arc, vec}; use crate::{ execution::context::ExecutionProps, - logical_plan::{DFSchema, Expr, LogicalPlan}, + logical_plan::{col, DFField, DFSchema, Expr, LogicalPlan}, physical_plan::aggregates::AggregateFunction, scalar::ScalarValue, }; @@ -51,7 +51,12 @@ impl OptimizerRule for AggregateStatistics { group_expr, aggr_expr, schema, - } if group_expr.is_empty() && aggr_expr.len() == 1 => { + } if group_expr.is_empty() => { + // aggregations that can not be replaced + // using statistics + let mut agg = vec![]; + // expressions that can be replaced by constants + let mut projections = vec![]; if let Some(num_rows) = match input.as_ref() { LogicalPlan::TableScan { source, .. } if source.has_exact_statistics() => @@ -60,30 +65,66 @@ impl OptimizerRule for AggregateStatistics { } _ => None, } { - return match &aggr_expr[0] { - Expr::AggregateFunction { - fun: AggregateFunction::Count, - args, - distinct: false, - } if args == &[Expr::Literal(ScalarValue::UInt8(Some(1)))] => { - Ok(LogicalPlan::Projection { - expr: vec![Expr::Alias( + for expr in aggr_expr { + match expr { + Expr::AggregateFunction { + fun: AggregateFunction::Count, + args, + distinct: false, + } if args + == &[Expr::Literal(ScalarValue::UInt8(Some(1)))] => + { + projections.push(Expr::Alias( Box::new(Expr::Literal(ScalarValue::UInt64(Some( num_rows as u64, )))), "COUNT(Uint8(1))".to_string(), - )], - input: Arc::new(LogicalPlan::EmptyRelation { - produce_one_row: true, - schema: Arc::new(DFSchema::empty()), - }), - schema: schema.clone(), - }) + )); + } + _ => { + agg.push(expr.clone()); + } } - _ => Ok(plan.clone()), - }; - } + } + return Ok(if agg.is_empty() { + // table scan can be entirely removed + + LogicalPlan::Projection { + expr: projections, + input: Arc::new(LogicalPlan::EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + }), + schema: schema.clone(), + } + } else if projections.is_empty() { + // no replacements -> return original plan + plan.clone() + } else { + // Split into parts that can be supported and part that should stay in aggregate + let agg_fields = agg + .iter() + .map(|x| x.to_field(input.schema())) + .collect::>>()?; + let agg_schema = DFSchema::new(agg_fields)?; + let cols = agg + .iter() + .map(|e| e.name(&agg_schema)) + .collect::>>()?; + projections.extend(cols.iter().map(|x| col(x))); + LogicalPlan::Projection { + expr: projections, + schema: schema.clone(), + input: Arc::new(LogicalPlan::Aggregate { + input: input.clone(), + group_expr: vec![], + aggr_expr: agg, + schema: Arc::new(agg_schema), + }), + } + }); + } Ok(plan.clone()) } // Rest: recurse and find possible statistics @@ -207,6 +248,32 @@ mod tests { Ok(()) } + #[test] + fn optimize_count_sum() -> Result<()> { + use crate::execution::context::ExecutionContext; + let mut ctx = ExecutionContext::new(); + ctx.register_table( + "test", + Arc::new(TestTableProvider { + num_rows: 100, + is_exact: true, + }), + ) + .unwrap(); + + let plan = ctx + .create_logical_plan("select sum(a)/count(*) from test") + .unwrap(); + let expected = "\ + Projection: #SUM(test.a) Divide #COUNT(UInt8(1))\ + \n Projection: UInt64(100) AS COUNT(Uint8(1)), #SUM(test.a)\ + \n Aggregate: groupBy=[[]], aggr=[[SUM(#test.a)]]\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let opt = AggregateStatistics::new(); let optimized_plan = opt.optimize(plan, &ExecutionProps::new()).unwrap(); From 441ed0425a6d7a833260704bd2960614214627e9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 28 Jun 2021 07:47:27 +0200 Subject: [PATCH 11/12] Added tests --- .../src/optimizer/aggregate_statistics.rs | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/datafusion/src/optimizer/aggregate_statistics.rs b/datafusion/src/optimizer/aggregate_statistics.rs index 164643dfc03a..adc63a2af069 100644 --- a/datafusion/src/optimizer/aggregate_statistics.rs +++ b/datafusion/src/optimizer/aggregate_statistics.rs @@ -274,6 +274,57 @@ mod tests { Ok(()) } + #[test] + fn optimize_count_group_by() -> Result<()> { + use crate::execution::context::ExecutionContext; + let mut ctx = ExecutionContext::new(); + ctx.register_table( + "test", + Arc::new(TestTableProvider { + num_rows: 100, + is_exact: true, + }), + ) + .unwrap(); + + let plan = ctx + .create_logical_plan("SELECT count(*), a FROM test GROUP BY a") + .unwrap(); + let expected = "\ + Projection: #COUNT(UInt8(1)), #test.a\ + \n Aggregate: groupBy=[[#test.a]], aggr=[[COUNT(UInt8(1))]]\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + + #[test] + fn optimize_count_filter() -> Result<()> { + use crate::execution::context::ExecutionContext; + let mut ctx = ExecutionContext::new(); + ctx.register_table( + "test", + Arc::new(TestTableProvider { + num_rows: 100, + is_exact: true, + }), + ) + .unwrap(); + + let plan = ctx + .create_logical_plan("SELECT count(*) FROM test WHERE a < 5") + .unwrap(); + let expected = "\ + Projection: #COUNT(UInt8(1))\ + \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ + \n Filter: #test.a Lt Int64(5)\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, expected); + Ok(()) + } + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { let opt = AggregateStatistics::new(); let optimized_plan = opt.optimize(plan, &ExecutionProps::new()).unwrap(); From 3bc6fb68e8cf6b47106bbbf1ad49181f8c30bb06 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 28 Jun 2021 07:50:29 +0200 Subject: [PATCH 12/12] Fix name --- datafusion/src/optimizer/aggregate_statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/aggregate_statistics.rs b/datafusion/src/optimizer/aggregate_statistics.rs index adc63a2af069..a20eafc688b8 100644 --- a/datafusion/src/optimizer/aggregate_statistics.rs +++ b/datafusion/src/optimizer/aggregate_statistics.rs @@ -144,7 +144,7 @@ impl OptimizerRule for AggregateStatistics { } fn name(&self) -> &str { - "count_statistics" + "aggregate_statistics" } }