Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 12, 2024
1 parent ca383a0 commit 325c253
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 60 deletions.
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ pub use utils::materialize_empty_df;
pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
pub use super::read_impl::{calc_prefilter_cost, PrefilterMaskSetting};
}
120 changes: 65 additions & 55 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,7 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns);
debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns);

enum MaskSetting {
Auto,
Pre,
Post,
}

let mask_setting =
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(MaskSetting::Auto, |v| match &v[..] {
"auto" => MaskSetting::Auto,
"pre" => MaskSetting::Pre,
"post" => MaskSetting::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
});
let mask_setting = PrefilterMaskSetting::init_from_env();

let dfs: Vec<Option<DataFrame>> = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
Expand Down Expand Up @@ -371,29 +359,8 @@ fn rg_to_dfs_prefiltered(
return Ok(Some(df));
}

let prefilter_cost = matches!(mask_setting, MaskSetting::Auto)
.then(|| {
let num_edges = filter_mask.num_edges() as f64;
let rg_len = filter_mask.len() as f64;

// @GB: I did quite some analysis on this.
//
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
//
// - Pre-filtered is faster when there is some amount of clustering or
// sorting involved or if the number of values selected is small.
// - Post-filtering is faster when the predicate selects a somewhat random
// elements throughout the row group.
//
// The following is a heuristic value to try and estimate which one is
// faster. Essentially, it sees how many times it needs to switch between
// skipping items and collecting items and compares it against the number
// of values that it will collect.
//
// Closer to 0: pre-filtering is probably better.
// Closer to 1: post-filtering is probably better.
(num_edges / rg_len).clamp(0.0, 1.0)
})
let prefilter_cost = matches!(mask_setting, PrefilterMaskSetting::Auto)
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

let rg_columns = (0..num_dead_columns)
Expand Down Expand Up @@ -440,25 +407,13 @@ fn rg_to_dfs_prefiltered(
array.filter(&mask_arr)
};

let array = match mask_setting {
MaskSetting::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
let is_nested =
schema.get_at_index(col_idx).unwrap().1.dtype.is_nested();

// We empirically selected these numbers.
let do_prefilter = (is_nested && prefilter_cost <= 0.01)
|| (!is_nested && prefilter_cost <= 0.02);

if do_prefilter {
pre()?
} else {
post()?
}
},
MaskSetting::Pre => pre()?,
MaskSetting::Post => post()?,
let array = if mask_setting.should_prefilter(
prefilter_cost,
&schema.get_at_index(col_idx).unwrap().1.dtype,
) {
pre()?
} else {
post()?
};

debug_assert_eq!(array.len(), filter_mask.set_bits());
Expand Down Expand Up @@ -1235,3 +1190,58 @@ impl BatchedParquetIter {
}
}
}

pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {
let num_edges = mask.num_edges() as f64;
let rg_len = mask.len() as f64;

// @GB: I did quite some analysis on this.
//
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
//
// - Pre-filtered is faster when there is some amount of clustering or
// sorting involved or if the number of values selected is small.
// - Post-filtering is faster when the predicate selects a somewhat random
// elements throughout the row group.
//
// The following is a heuristic value to try and estimate which one is
// faster. Essentially, it sees how many times it needs to switch between
// skipping items and collecting items and compares it against the number
// of values that it will collect.
//
// Closer to 0: pre-filtering is probably better.
// Closer to 1: post-filtering is probably better.
(num_edges / rg_len).clamp(0.0, 1.0)
}

pub enum PrefilterMaskSetting {
Auto,
Pre,
Post,
}

impl PrefilterMaskSetting {
pub fn init_from_env() -> Self {
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {
"auto" => Self::Auto,
"pre" => Self::Pre,
"post" => Self::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
})
}

pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {
match self {
Self::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
let is_nested = dtype.is_nested();

// We empirically selected these numbers.
(is_nested && prefilter_cost <= 0.01) || (!is_nested && prefilter_cost <= 0.02)
},
Self::Pre => true,
Self::Post => false,
}
}
}
1 change: 1 addition & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub trait PhysicalIoExpr: Send + Sync {
fn evaluate_io(&self, df: &DataFrame) -> PolarsResult<Series>;

/// Get the variables that are used in the expression i.e. live variables.
/// This can contain duplicates.
fn live_variables(&self) -> Option<Vec<PlSmallStr>>;

/// Can take &dyn Statistics and determine of a file should be
Expand Down
88 changes: 87 additions & 1 deletion crates/polars-stream/src/nodes/parquet_source/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures::StreamExt;
use polars_core::frame::DataFrame;
use polars_error::PolarsResult;
use polars_io::prelude::ParallelStrategy;
use polars_io::prelude::_internal::PrefilterMaskSetting;

use super::row_group_data_fetch::RowGroupDataFetcher;
use super::row_group_decode::RowGroupDecoder;
Expand Down Expand Up @@ -287,12 +288,67 @@ impl ParquetSourceNode {
let ideal_morsel_size = get_ideal_morsel_size();
let min_values_per_thread = self.config.min_values_per_thread;

let use_prefiltered = physical_predicate.is_some()
let mut use_prefiltered = physical_predicate.is_some()
&& matches!(
self.options.parallel,
ParallelStrategy::Auto | ParallelStrategy::Prefiltered
);

let predicate_arrow_field_indices = if use_prefiltered {
let v = physical_predicate
.as_ref()
.unwrap()
.live_variables()
.filter(|x| x.len() < projected_arrow_schema.len())
.and_then(|x| {
let mut out = x
.iter()
// Can be `None` - if the column is e.g. a hive column, or the row index column.
.filter_map(|x| projected_arrow_schema.index_of(x))
.collect::<Vec<_>>();

out.sort_unstable();
out.dedup();
// There is at least one non-predicate column.
(out.len() < projected_arrow_schema.len()).then_some(out)
});

use_prefiltered &= v.is_some();

v.unwrap_or_default()
} else {
vec![]
};

let use_prefiltered = use_prefiltered.then(PrefilterMaskSetting::init_from_env);

let non_predicate_arrow_field_indices = if use_prefiltered.is_some() {
filtered_range(
predicate_arrow_field_indices.as_slice(),
projected_arrow_schema.len(),
)
} else {
vec![]
};

if use_prefiltered.is_some() && self.verbose {
eprintln!(
"[ParquetSource]: Pre-filtered decode enabled ({} live, {} non-live)",
predicate_arrow_field_indices.len(),
non_predicate_arrow_field_indices.len()
)
}

let predicate_arrow_field_mask = if use_prefiltered.is_some() {
let mut out = vec![false; projected_arrow_schema.len()];
for i in predicate_arrow_field_indices.iter() {
out[*i] = true;
}
out
} else {
vec![]
};

RowGroupDecoder {
scan_sources,
hive_partitions,
Expand All @@ -302,6 +358,9 @@ impl ParquetSourceNode {
row_index,
physical_predicate,
use_prefiltered,
predicate_arrow_field_indices,
non_predicate_arrow_field_indices,
predicate_arrow_field_mask,
ideal_morsel_size,
min_values_per_thread,
}
Expand Down Expand Up @@ -341,3 +400,30 @@ impl ParquetSourceNode {
}
}
}

/// Returns 0..len in a Vec, excluding indices in `exclude`.
/// `exclude` needs to be a sorted list of unique values.
fn filtered_range(exclude: &[usize], len: usize) -> Vec<usize> {
let mut j = 0;

(0..len)
.filter(|&i| {
if j == exclude.len() || i != exclude[j] {
true
} else {
j += 1;
false
}
})
.collect()
}

mod tests {

#[test]
fn test_filtered_range() {
use super::filtered_range;
assert_eq!(filtered_range(&[1, 3], 7).as_slice(), &[0, 2, 4, 5, 6]);
assert_eq!(filtered_range(&[1, 6], 7).as_slice(), &[0, 2, 3, 4, 5,]);
}
}
Loading

0 comments on commit 325c253

Please sign in to comment.