Skip to content

Commit

Permalink
Minor: reduce indent level in page filter pruning code
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 29, 2023
1 parent 842bda3 commit 9c3f400
Showing 1 changed file with 75 additions and 60 deletions.
135 changes: 75 additions & 60 deletions datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,71 +139,86 @@ impl PagePruningPredicate {
let page_index_predicates = &self.predicates;
let groups = file_metadata.row_groups();

let file_offset_indexes = file_metadata.offset_indexes();
let file_page_indexes = file_metadata.page_indexes();
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
let (file_offset_indexes, file_page_indexes) =
match (file_metadata.offset_indexes(), file_metadata.page_indexes()) {
(Some(o), Some(i)) => (o, i),
_ => {
trace!(
"skip page pruning due to lack of indexes. Haev offset: {} file: {}",
file_metadata.offset_indexes().is_some(),
file_metadata.page_indexes().is_some()
);
return Ok(None);
}
};

let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
let col_id =
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {e}"
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
col_id
} else {
continue;
};

let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {e}"
))
}),
);
row_selections
.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip = final_selection.iter().fold(0, |acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
trace!(
"Did not have enough metadata to prune with page indexes, \
falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
});
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
} else {
Ok(None)
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}

let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
final_selection.iter().fold(
0,
|acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
}
}

Expand Down

0 comments on commit 9c3f400

Please sign in to comment.