diff --git a/Cargo.toml b/Cargo.toml index 36a9405b0fbe9..1ab431ea316a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,20 @@ [workspace] exclude = ["datafusion-cli"] -members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks", +members = [ + "datafusion/common", + "datafusion/core", + "datafusion/expr", + "datafusion/jit", + "datafusion/optimizer", + "datafusion/physical-expr", + "datafusion/proto", + "datafusion/row", + "datafusion/sql", + "datafusion-examples", + "test-utils", + "parquet-test-utils", + "benchmarks", ] [profile.release] diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 8795a86111930..dd9253b8d9923 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" object_store = "0.5.0" parquet = "25.0.0" +parquet-test-utils = { path = "../parquet-test-utils/" } rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.78" diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 3efa86f27e949..4ec2dc90c37ba 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -15,30 +15,14 @@ // specific language governing permissions and limitations // under the License. -use arrow::datatypes::SchemaRef; use arrow::util::pretty; -use datafusion::common::{Result, ToDFSchema}; -use datafusion::config::{ - ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS, - OPT_PARQUET_REORDER_FILTERS, -}; -use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::execution::context::ExecutionProps; +use datafusion::common::Result; use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; -use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::collect; -use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; -use datafusion::physical_plan::filter::FilterExec; use datafusion::prelude::{col, SessionConfig, SessionContext}; -use object_store::path::Path; -use object_store::ObjectMeta; -use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; -use std::fs::File; +use parquet_test_utils::{ParquetScanOptions, TestParquetFile}; use std::path::PathBuf; -use std::sync::Arc; use std::time::Instant; use structopt::StructOpt; use test_utils::AccessLogGenerator; @@ -89,34 +73,16 @@ async fn main() -> Result<()> { let path = opt.path.join("logs.parquet"); - let (schema, object_store_url, object_meta) = - gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; + let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; - run_benchmarks( - &mut ctx, - schema, - object_store_url, - object_meta, - opt.iterations, - opt.debug, - ) - .await?; + run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?; Ok(()) } -#[derive(Debug, Clone)] -struct ParquetScanOptions { - pushdown_filters: bool, - reorder_filters: bool, - enable_page_index: bool, -} - async fn run_benchmarks( ctx: &mut SessionContext, - schema: SchemaRef, - object_store_url: ObjectStoreUrl, - object_meta: ObjectMeta, + test_file: &TestParquetFile, iterations: usize, debug: bool, ) -> Result<()> { @@ -156,8 +122,7 @@ async fn run_benchmarks( disjunction([ col("request_method").not_eq(lit("GET")), col("response_status").eq(lit(400_u16)), - // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same") - // col("service").eq(lit("backend")), + col("service").eq(lit("backend")), ]) .unwrap(), // Filter everything @@ -174,9 +139,7 @@ async fn run_benchmarks( let start = Instant::now(); let rows = exec_scan( ctx, - schema.clone(), - object_store_url.clone(), - object_meta.clone(), + test_file, filter_expr.clone(), scan_options.clone(), debug, @@ -197,52 +160,12 @@ async fn run_benchmarks( async fn exec_scan( ctx: &SessionContext, - schema: SchemaRef, - object_store_url: ObjectStoreUrl, - object_meta: ObjectMeta, + test_file: &TestParquetFile, filter: Expr, scan_options: ParquetScanOptions, debug: bool, ) -> Result { - let ParquetScanOptions { - pushdown_filters, - reorder_filters, - enable_page_index, - } = scan_options; - - let mut config_options = ConfigOptions::new(); - config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); - config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); - config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); - - let scan_config = FileScanConfig { - object_store_url, - file_schema: schema.clone(), - file_groups: vec![vec![PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - extensions: None, - }]], - statistics: Default::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - config_options: config_options.into_shareable(), - }; - - let df_schema = schema.clone().to_dfschema()?; - - let physical_filter_expr = create_physical_expr( - &filter, - &df_schema, - schema.as_ref(), - &ExecutionProps::default(), - )?; - - let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None)); - - let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let exec = test_file.create_scan(filter, scan_options).await?; let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; @@ -258,53 +181,15 @@ fn gen_data( scale_factor: f32, page_size: Option, row_group_size: Option, -) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> { +) -> Result { let generator = AccessLogGenerator::new(); - let file = File::create(&path).unwrap(); - - let mut props_builder = WriterProperties::builder(); - - if let Some(s) = page_size { - props_builder = props_builder - .set_data_pagesize_limit(s) - .set_write_batch_size(s); - } - - if let Some(s) = row_group_size { - props_builder = props_builder.set_max_row_group_size(s); - } - - let schema = generator.schema(); - let mut writer = - ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap(); - - let mut num_rows = 0; - let num_batches = 100_f32 * scale_factor; - for batch in generator.take(num_batches as usize) { - writer.write(&batch).unwrap(); - writer.flush()?; - num_rows += batch.num_rows(); - } - writer.close().unwrap(); - - println!("Generated test dataset with {} rows", num_rows); - - let size = std::fs::metadata(&path)?.len() as usize; - - let canonical_path = path.canonicalize()?; - - let object_store_url = - ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())? - .object_store(); - - let object_meta = ObjectMeta { - location: Path::parse(canonical_path.to_str().unwrap_or_default())?, - last_modified: Default::default(), - size, - }; - - Ok((schema, object_store_url, object_meta)) + TestParquetFile::try_new( + path, + generator.take(num_batches as usize), + page_size, + row_group_size, + ) } diff --git a/parquet-test-utils/Cargo.toml b/parquet-test-utils/Cargo.toml new file mode 100644 index 0000000000000..599cdc35b17fe --- /dev/null +++ b/parquet-test-utils/Cargo.toml @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet-test-utils" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +datafusion = { path = "../datafusion/core" } +object_store = "0.5.0" +parquet = "25.0.0" diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs new file mode 100644 index 0000000000000..6c14540169024 --- /dev/null +++ b/parquet-test-utils/src/lib.rs @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Helpers for writing parquet files and reading them back + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; + +use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use datafusion::common::ToDFSchema; +use datafusion::config::{ + ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS, + OPT_PARQUET_REORDER_FILTERS, +}; +use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::error::Result; +use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_expr::execution_props::ExecutionProps; +use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::Expr; +use object_store::path::Path; +use object_store::ObjectMeta; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; + +/// a ParquetFile that has been created for testing. +pub struct TestParquetFile { + path: PathBuf, + schema: SchemaRef, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, +} + +#[derive(Debug, Clone)] +pub struct ParquetScanOptions { + pub pushdown_filters: bool, + pub reorder_filters: bool, + pub enable_page_index: bool, +} + +impl TestParquetFile { + /// Creates a new parquet file at the specified location + pub fn try_new( + path: PathBuf, + batches: impl IntoIterator, + page_size: Option, + row_group_size: Option, + ) -> Result { + let file = File::create(&path).unwrap(); + + let mut props_builder = WriterProperties::builder(); + + if let Some(s) = page_size { + props_builder = props_builder + .set_data_pagesize_limit(s) + .set_write_batch_size(s); + } + + if let Some(s) = row_group_size { + props_builder = props_builder.set_max_row_group_size(s); + } + + let mut batches = batches.into_iter(); + let first_batch = batches.next().expect("need at least one record batch"); + let schema = first_batch.schema(); + + let mut writer = + ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())) + .unwrap(); + + writer.write(&first_batch).unwrap(); + writer.flush()?; + let mut num_rows = first_batch.num_rows(); + + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush()?; + num_rows += batch.num_rows(); + } + writer.close().unwrap(); + + println!("Generated test dataset with {} rows", num_rows); + + let size = std::fs::metadata(&path)?.len() as usize; + + let canonical_path = path.canonicalize()?; + + let object_store_url = + ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())? + .object_store(); + + let object_meta = ObjectMeta { + location: Path::parse(canonical_path.to_str().unwrap_or_default())?, + last_modified: Default::default(), + size, + }; + + Ok(Self { + path, + schema, + object_store_url, + object_meta, + }) + } + + /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file. + /// + /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter: + /// + /// ```text + /// (FilterExec) + /// (ParquetExec) + /// ``` + pub async fn create_scan( + &self, + filter: Expr, + scan_options: ParquetScanOptions, + ) -> Result> { + let ParquetScanOptions { + pushdown_filters, + reorder_filters, + enable_page_index, + } = scan_options; + + let mut config_options = ConfigOptions::new(); + config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters); + config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters); + config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index); + + let scan_config = FileScanConfig { + object_store_url: self.object_store_url.clone(), + file_schema: self.schema.clone(), + file_groups: vec![vec![PartitionedFile { + object_meta: self.object_meta.clone(), + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + config_options: config_options.into_shareable(), + }; + + let df_schema = self.schema.clone().to_dfschema_ref()?; + + // run coercion on the filters to coerce types etc. + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(df_schema.clone()); + let simplifier = ExprSimplifier::new(context); + let filter = simplifier.coerce(filter, df_schema.clone()).unwrap(); + + let physical_filter_expr = create_physical_expr( + &filter, + &df_schema, + self.schema.as_ref(), + &ExecutionProps::default(), + )?; + + let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None)); + + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + + Ok(exec) + } + + /// Retrieve metrics from the parquet exec returned from `create_scan` + /// + /// Recursively searches for ParquetExec and returns the metrics + /// on the first one it finds + pub fn parquet_metrics(&self, plan: Arc) -> Option { + if let Some(parquet) = plan.as_any().downcast_ref::() { + return parquet.metrics(); + } + + for child in plan.children() { + if let Some(metrics) = self.parquet_metrics(child) { + return Some(metrics); + } + } + None + } + + /// The schema of this parquet file + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// The path to the parquet file + pub fn path(&self) -> &std::path::Path { + self.path.as_path() + } +} diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs index a77728eea23c6..dd516d5fe4c92 100644 --- a/test-utils/src/data_gen.rs +++ b/test-utils/src/data_gen.rs @@ -190,6 +190,11 @@ pub struct AccessLogGenerator { schema: SchemaRef, rng: StdRng, host_idx: usize, + + /// optional number of rows produced + row_limit: Option, + /// How many rows have been returned so far + row_count: usize, } impl Default for AccessLogGenerator { @@ -209,6 +214,8 @@ impl AccessLogGenerator { schema: BatchBuilder::schema(), host_idx: 0, rng: StdRng::from_seed(seed), + row_limit: None, + row_count: 0, } } @@ -216,12 +223,25 @@ impl AccessLogGenerator { pub fn schema(&self) -> SchemaRef { self.schema.clone() } + + /// Return up to row_limit rows; + pub fn with_row_limit(mut self, row_limit: Option) -> Self { + self.row_limit = row_limit; + self + } } impl Iterator for AccessLogGenerator { type Item = RecordBatch; fn next(&mut self) -> Option { + // if we have a limit and have passed it, stop generating + if let Some(limit) = self.row_limit { + if self.row_count >= limit { + return None; + } + } + let mut builder = BatchBuilder::default(); let host = format!( @@ -236,6 +256,18 @@ impl Iterator for AccessLogGenerator { } builder.append(&mut self.rng, &host, service); } - Some(builder.finish(Arc::clone(&self.schema))) + + let batch = builder.finish(Arc::clone(&self.schema)); + + // limit batch if needed to stay under row limit + let batch = if let Some(limit) = self.row_limit { + let num_rows = limit - self.row_count; + batch.slice(0, num_rows) + } else { + batch + }; + + self.row_count += batch.num_rows(); + Some(batch) } }