Skip to content

Commit

Permalink
Extract common parquet testing code to parquet-test-util crate (apa…
Browse files Browse the repository at this point in the history
…che#4042)

* Extract common parquet testing code to `parquet-test-util` crate

* fix doc tests
  • Loading branch information
alamb authored and Dandandan committed Nov 5, 2022
1 parent 5f7dc1b commit 96a454b
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 133 deletions.
15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@

[workspace]
exclude = ["datafusion-cli"]
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/jit",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/row",
"datafusion/sql",
"datafusion-examples",
"test-utils",
"parquet-test-utils",
"benchmarks",
]

[profile.release]
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
object_store = "0.5.0"
parquet = "25.0.0"
parquet-test-utils = { path = "../parquet-test-utils/" }
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
Expand Down
147 changes: 16 additions & 131 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::SchemaRef;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
OPT_PARQUET_REORDER_FILTERS,
};
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::File;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;
Expand Down Expand Up @@ -89,34 +73,16 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let (schema, object_store_url, object_meta) =
gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;

run_benchmarks(
&mut ctx,
schema,
object_store_url,
object_meta,
opt.iterations,
opt.debug,
)
.await?;
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;

Ok(())
}

#[derive(Debug, Clone)]
struct ParquetScanOptions {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
}

async fn run_benchmarks(
ctx: &mut SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
test_file: &TestParquetFile,
iterations: usize,
debug: bool,
) -> Result<()> {
Expand Down Expand Up @@ -156,8 +122,7 @@ async fn run_benchmarks(
disjunction([
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
// TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
// col("service").eq(lit("backend")),
col("service").eq(lit("backend")),
])
.unwrap(),
// Filter everything
Expand All @@ -174,9 +139,7 @@ async fn run_benchmarks(
let start = Instant::now();
let rows = exec_scan(
ctx,
schema.clone(),
object_store_url.clone(),
object_meta.clone(),
test_file,
filter_expr.clone(),
scan_options.clone(),
debug,
Expand All @@ -197,52 +160,12 @@ async fn run_benchmarks(

async fn exec_scan(
ctx: &SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
test_file: &TestParquetFile,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let ParquetScanOptions {
pushdown_filters,
reorder_filters,
enable_page_index,
} = scan_options;

let mut config_options = ConfigOptions::new();
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);

let scan_config = FileScanConfig {
object_store_url,
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Default::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
};

let df_schema = schema.clone().to_dfschema()?;

let physical_filter_expr = create_physical_expr(
&filter,
&df_schema,
schema.as_ref(),
&ExecutionProps::default(),
)?;

let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
let exec = test_file.create_scan(filter, scan_options).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand All @@ -258,53 +181,15 @@ fn gen_data(
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();

let file = File::create(&path).unwrap();

let mut props_builder = WriterProperties::builder();

if let Some(s) = page_size {
props_builder = props_builder
.set_data_pagesize_limit(s)
.set_write_batch_size(s);
}

if let Some(s) = row_group_size {
props_builder = props_builder.set_max_row_group_size(s);
}

let schema = generator.schema();
let mut writer =
ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();

let mut num_rows = 0;

let num_batches = 100_f32 * scale_factor;

for batch in generator.take(num_batches as usize) {
writer.write(&batch).unwrap();
writer.flush()?;
num_rows += batch.num_rows();
}
writer.close().unwrap();

println!("Generated test dataset with {} rows", num_rows);

let size = std::fs::metadata(&path)?.len() as usize;

let canonical_path = path.canonicalize()?;

let object_store_url =
ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
.object_store();

let object_meta = ObjectMeta {
location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
last_modified: Default::default(),
size,
};

Ok((schema, object_store_url, object_meta))
TestParquetFile::try_new(
path,
generator.take(num_batches as usize),
page_size,
row_group_size,
)
}
28 changes: 28 additions & 0 deletions parquet-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "parquet-test-utils"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion = { path = "../datafusion/core" }
object_store = "0.5.0"
parquet = "25.0.0"
Loading

0 comments on commit 96a454b

Please sign in to comment.