Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: reduce indent level in page filter pruning code #5105

Merged
merged 2 commits into from
Feb 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 74 additions & 58 deletions datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,69 +141,85 @@ impl PagePruningPredicate {

let file_offset_indexes = file_metadata.offset_indexes();
let file_page_indexes = file_metadata.page_indexes();
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
let (file_offset_indexes, file_page_indexes) =
match (file_offset_indexes, file_page_indexes) {
(Some(o), Some(i)) => (o, i),
_ => {
trace!(
"skip page pruning due to lack of indexes. Have offset: {} file: {}",
file_offset_indexes.is_some(), file_page_indexes.is_some()
);
return Ok(None);
}
};

let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
let col_id =
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {e}"
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
col_id
} else {
continue;
};

let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {e}"
))
}),
);
row_selections
.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip = final_selection.iter().fold(0, |acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
trace!(
"Did not have enough metadata to prune with page indexes, \
falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
});
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
} else {
Ok(None)
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}

let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
final_selection.iter().fold(
0,
|acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
}
}

Expand Down