Skip to content

Commit

Permalink
Table Scan: Add Row Group Skipping (apache#558)
Browse files Browse the repository at this point in the history
* feat(scan): add row group and page index row selection filtering

* fix(row selection): off-by-one error

* feat: remove row selection to defer to a second PR

* feat: better min/max val conversion in RowGroupMetricsEvaluator

* test(row_group_filtering): first three tests

* test(row_group_filtering): next few tests

* test: add more tests for RowGroupMetricsEvaluator

* chore: refactor test assertions to silence clippy lints

* refactor: consolidate parquet stat min/max parsing in one place
  • Loading branch information
sdd authored Aug 29, 2024
1 parent da08e8d commit 7aa8bdd
Show file tree
Hide file tree
Showing 8 changed files with 2,187 additions and 238 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ once_cell = "1"
opendal = "0.49"
ordered-float = "4"
parquet = "52"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
port_scanner = "0.1.5"
rand = "0.8"
regex = "1.10.5"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31"
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -84,5 +85,6 @@ ctor = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
210 changes: 145 additions & 65 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::FromStr;
use std::sync::Arc;

use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
use arrow_string::like::starts_with;
Expand All @@ -32,7 +32,7 @@ use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
Expand All @@ -54,6 +55,7 @@ pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
}

impl ArrowReaderBuilder {
Expand All @@ -65,13 +67,13 @@ impl ArrowReaderBuilder {
batch_size: None,
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
}
}

/// Sets the max number of in flight data files that are being fetched
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.concurrency_limit_data_files = val;

self
}

Expand All @@ -82,12 +84,19 @@ impl ArrowReaderBuilder {
self
}

/// Determines whether to enable row group filtering.
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
}
}
}
Expand All @@ -100,6 +109,8 @@ pub struct ArrowReader {

/// the maximum number of data files that can be fetched at the same time
concurrency_limit_data_files: usize,

row_group_filtering_enabled: bool,
}

impl ArrowReader {
Expand All @@ -109,6 +120,7 @@ impl ArrowReader {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();
Expand All @@ -124,8 +136,14 @@ impl ArrowReader {
let file_path = task.data_file_path().to_string();

spawn(async move {
Self::process_file_scan_task(task, batch_size, file_io, tx)
.await
Self::process_file_scan_task(
task,
batch_size,
file_io,
tx,
row_group_filtering_enabled,
)
.await
})
.await
.map_err(|e| e.with_context("file_path", file_path))
Expand All @@ -149,55 +167,95 @@ impl ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
) -> Result<()> {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};

if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
}

// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(task.data_file_path())?;

let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
// Page index will be required in upcoming row selection PR
ArrowReaderOptions::new().with_page_index(false),
)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
parquet_schema,
arrow_schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

if let Some(batch_size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;
if let Some(predicate) = task.predicate() {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
predicate,
)?;

let row_filter = Self::get_row_filter(
predicate,
record_batch_stream_builder.parquet_schema(),
&iceberg_field_ids,
&field_id_map,
)?;
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);

let mut selected_row_groups = None;
if row_group_filtering_enabled {
let result = Self::get_selected_row_group_indices(
predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
task.schema(),
)?;

selected_row_groups = Some(result);
}

if let Some(selected_row_groups) = selected_row_groups {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_groups);
}
}

while let Some(batch) = batch_stream.try_next().await? {
// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}

Ok(())
}

fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
// Collects all Iceberg field IDs referenced in the filter predicate
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut collector, predicate)?;

let iceberg_field_ids = collector.field_ids();
let field_id_map = build_field_id_map(parquet_schema)?;

Ok((iceberg_field_ids, field_id_map))
}

fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
Expand Down Expand Up @@ -269,43 +327,59 @@ impl ArrowReader {
}

fn get_row_filter(
predicates: Option<&BoundPredicate>,
predicates: &BoundPredicate,
parquet_schema: &SchemaDescriptor,
collector: &CollectFieldIdVisitor,
) -> Result<Option<RowFilter>> {
if let Some(predicates) = predicates {
let field_id_map = build_field_id_map(parquet_schema)?;

// Collect Parquet column indices from field ids.
// If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
let mut column_indices = collector
.field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();

column_indices.sort();

// The converter that converts `BoundPredicates` to `ArrowPredicates`
let mut converter = PredicateConverter {
parquet_schema,
column_map: &field_id_map,
column_indices: &column_indices,
};

// After collecting required leaf column indices used in the predicate,
// creates the projection mask for the Arrow predicates.
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(Some(RowFilter::new(vec![Box::new(arrow_predicate)])))
} else {
Ok(None)
iceberg_field_ids: &HashSet<i32>,
field_id_map: &HashMap<i32, usize>,
) -> Result<RowFilter> {
// Collect Parquet column indices from field ids.
// If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
let mut column_indices = iceberg_field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();
column_indices.sort();

// The converter that converts `BoundPredicates` to `ArrowPredicates`
let mut converter = PredicateConverter {
parquet_schema,
column_map: field_id_map,
column_indices: &column_indices,
};

// After collecting required leaf column indices used in the predicate,
// creates the projection mask for the Arrow predicates.
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}

fn get_selected_row_group_indices(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<Vec<usize>> {
let row_groups_metadata = parquet_metadata.row_groups();
let mut results = Vec::with_capacity(row_groups_metadata.len());

for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
if RowGroupMetricsEvaluator::eval(
predicate,
row_group_metadata,
field_id_map,
snapshot_schema,
)? {
results.push(idx);
}
}

Ok(results)
}
}

/// Build the map of field id to Parquet column index in the schema.
/// Build the map of parquet field id to Parquet column index in the schema.
fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> {
let mut column_map = HashMap::new();
for (idx, field) in parquet_schema.columns().iter().enumerate() {
Expand Down Expand Up @@ -345,6 +419,12 @@ struct CollectFieldIdVisitor {
field_ids: HashSet<i32>,
}

impl CollectFieldIdVisitor {
fn field_ids(self) -> HashSet<i32> {
self.field_ids
}
}

impl BoundPredicateVisitor for CollectFieldIdVisitor {
type T = ();

Expand Down
Loading

0 comments on commit 7aa8bdd

Please sign in to comment.