Skip to content

Commit

Permalink
fix: compile error due to merge stale PR (apache#646)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan authored Sep 24, 2024
1 parent 420b4e2 commit ab51355
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl ArrowReader {
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let should_load_page_index = row_selection_enabled && task.predicate().is_some();
let should_load_page_index = row_selection_enabled && task.predicate.is_some();

// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
Expand Down Expand Up @@ -245,7 +245,7 @@ impl ArrowReader {
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
task.schema(),
&task.schema,
)?;

record_batch_stream_builder =
Expand Down
44 changes: 35 additions & 9 deletions crates/iceberg/src/expr/visitors/page_index_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use ordered_float::OrderedFloat;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::page_index::index::Index;
use parquet::format::PageLocation;
use parquet::file::page_index::offset_index::OffsetIndexMetaData;

use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
use crate::{Error, ErrorKind, Result};

type OffsetIndex = Vec<Vec<PageLocation>>;
type OffsetIndex = Vec<OffsetIndexMetaData>;

const IN_PREDICATE_LIMIT: usize = 200;

Expand Down Expand Up @@ -206,13 +206,14 @@ impl<'a> PageIndexEvaluator<'a> {
}

/// returns a list of row counts per page
fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec<usize> {
fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec<usize> {
let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
let mut row_counts = Vec::with_capacity(self.offset_index.len());

for (idx, page_location) in offset_index.iter().enumerate() {
let row_count = if idx < offset_index.len() - 1 {
let row_count = (offset_index[idx + 1].first_row_index
let page_locations = offset_index.page_locations();
for (idx, page_location) in page_locations.iter().enumerate() {
let row_count = if idx < page_locations.len() - 1 {
let row_count = (page_locations[idx + 1].first_row_index
- page_location.first_row_index) as usize;
remaining_rows -= row_count;
row_count
Expand Down Expand Up @@ -868,6 +869,7 @@ mod tests {
use parquet::data_type::ByteArray;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::page_index::index::{Index, NativeIndex, PageIndex};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::statistics::Statistics;
use parquet::format::{BoundaryOrder, PageLocation};
use parquet::schema::types::{
Expand Down Expand Up @@ -1417,28 +1419,36 @@ mod tests {
Ok(row_group_metadata?)
}

fn create_page_index() -> Result<(Vec<Index>, Vec<Vec<PageLocation>>)> {
fn create_page_index() -> Result<(Vec<Index>, Vec<OffsetIndexMetaData>)> {
let idx_float = Index::FLOAT(NativeIndex::<f32> {
indexes: vec![
PageIndex {
min: None,
max: None,
null_count: Some(1024),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some(0.0),
max: Some(10.0),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some(10.0),
max: Some(20.0),
null_count: Some(1),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
repetition_level_histogram: None,
definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
Expand All @@ -1450,26 +1460,36 @@ mod tests {
min: Some("AA".into()),
max: Some("DD".into()),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some("DE".into()),
max: Some("DE".into()),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some("DF".into()),
max: Some("UJ".into()),
null_count: Some(1),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: Some(48),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
repetition_level_histogram: None,
definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
Expand All @@ -1491,8 +1511,14 @@ mod tests {
];

Ok((vec![idx_float, idx_string], vec![
page_locs_float,
page_locs_string,
OffsetIndexMetaData {
page_locations: page_locs_float,
unencoded_byte_array_data_bytes: None,
},
OffsetIndexMetaData {
page_locations: page_locs_string,
unencoded_byte_array_data_bytes: None,
},
]))
}
}

0 comments on commit ab51355

Please sign in to comment.