Skip to content

Commit

Permalink
Minor: Add new bloom filter predicate tests (apache#8433)
Browse files Browse the repository at this point in the history
* Minor: Add new bloom filter tests

* fmt
  • Loading branch information
alamb authored Dec 8, 2023
1 parent d771f26 commit d43a70d
Showing 1 changed file with 113 additions and 4 deletions.
117 changes: 113 additions & 4 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ mod tests {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF,
TableSource, WindowUDF,
Expand Down Expand Up @@ -994,6 +995,26 @@ mod tests {
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

// Note the values in the `String` column are:
// ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
// +-----------+
// | String |
// +-----------+
// | Hello |
// | This is |
// | a |
// | test |
// | How |
// | are you |
// | doing |
// | today |
// | the quick |
// | brown fox |
// | jumps |
// | over |
// | the lazy |
// | dog |
// +-----------+
#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
// load parquet file
Expand All @@ -1002,7 +1023,7 @@ mod tests {
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
// generate pruning predicate `(String = "Hello_Not_exists")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#).eq(lit("Hello_Not_Exists"));
let expr = logical2physical(&expr, &schema);
Expand All @@ -1029,7 +1050,7 @@ mod tests {
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
// generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = lit("1").eq(lit("1")).and(
col(r#""String""#)
Expand Down Expand Up @@ -1091,7 +1112,7 @@ mod tests {
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
// generate pruning predicate `(String = "Hello")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#).eq(lit("Hello"));
let expr = logical2physical(&expr, &schema);
Expand All @@ -1110,6 +1131,94 @@ mod tests {
assert_eq!(pruned_row_groups, row_groups);
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello") OR (String = "the quick")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick")))
.or(col(r#""String""#).eq(lit("are you")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "foo") OR (String != "bar")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.not_eq(lit("foo"))
.or(col(r#""String""#).not_eq(lit("bar")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() {
// load parquet file
Expand All @@ -1118,7 +1227,7 @@ mod tests {
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate
// generate pruning predicate on a column without a bloom filter
let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]);
let expr = col(r#""string_col""#).eq(lit("0"));
let expr = logical2physical(&expr, &schema);
Expand Down

0 comments on commit d43a70d

Please sign in to comment.