Skip to content

Commit

Permalink
Add support for level histograms added in PARQUET-2261 to `ParquetMet…
Browse files Browse the repository at this point in the history
…aData` (#6105)

* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)

* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix example tests

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* Remove `impl<T: AsRef<[u8]>> From<T> for Buffer`  that easily accidentally copies data (#6043)

* deprecate auto copy, ask explicit reference

* update comments

* make cargo doc happy

* Make display of interval types more pretty (#6006)

* improve dispaly for interval.

* update test in pretty, and fix display problem.

* tmp

* fix tests in arrow-cast.

* fix tests in pretty.

* fix style.

* Update snafu (#5930)

* Update Parquet thrift generated structures (#6045)

* update to latest thrift (as of 11 Jul 2024) from parquet-format

* pass None for optional size statistics

* escape HTML tags

* don't need to escape brackets in arrays

* Revert "Revert "Write Bloom filters between row groups instead of the end  (#…" (#5933)

This reverts commit 22e0b44.

* Revert "Update snafu (#5930)" (#6069)

This reverts commit 756b1fb.

* Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)

* Update pyo3 requirement from 0.21.1 to 0.22.1

Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
- [Commits](PyO3/pyo3@v0.21.1...v0.22.1)

---
updated-dependencies:
- dependency-name: pyo3
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* refactor: remove deprecated `FromPyArrow::from_pyarrow`

"GIL Refs" are being phased out.

* chore: update `pyo3` in integration tests

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* remove repeated codes to make the codes more concise. (#6080)

* Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)

* update to latest thrift (as of 11 Jul 2024) from parquet-format

* pass None for optional size statistics

* escape HTML tags

* don't need to escape brackets in arrays

* add support for unencoded_byte_array_data_bytes

* add comments

* change sig of ColumnMetrics::update_variable_length_bytes()

* rename ParquetOffsetIndex to OffsetSizeIndex

* rename some functions

* suggestion from review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* add Default trait to ColumnMetrics as suggested in review

* rename OffsetSizeIndex to OffsetIndexMetaData

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* deprecate read_page_locations

* add level histograms to metadata

* add to_thrift() to OffsetIndexMetaData

* Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)

Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
- [Commits](PyO3/pyo3@v0.21.1...v0.22.2)

---
updated-dependencies:
- dependency-name: pyo3
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Deprecate read_page_locations() and simplify offset index in `ParquetMetaData` (#6095)

* deprecate read_page_locations

* add to_thrift() to OffsetIndexMetaData

* move valid test into ColumnIndexBuilder::append_histograms

* move update_histogram() inside ColumnMetrics

* Update parquet/src/column/writer/mod.rs

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* Implement LevelHistograms as a struct

* formatting

* fix error in docs

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: Xiangpeng Hao <haoxiangpeng123@gmail.com>
Co-authored-by: kamille <caoruiqiu.crq@antgroup.com>
Co-authored-by: Jesse <github@jessebakker.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
8 people authored Jul 26, 2024
1 parent 613e93e commit b06ffce
Show file tree
Hide file tree
Showing 5 changed files with 550 additions and 37 deletions.
137 changes: 126 additions & 11 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
Expand Down Expand Up @@ -189,6 +189,54 @@ struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl PageMetrics {
fn new() -> Self {
Default::default()
}

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Resets the state of this `PageMetrics` to the initial state.
/// If histograms have been initialized their contents will be reset to zero.
fn new_page(&mut self) {
self.num_buffered_values = 0;
self.num_buffered_rows = 0;
self.num_page_nulls = 0;
self.repetition_level_histogram
.as_mut()
.map(LevelHistogram::reset);
self.definition_level_histogram
.as_mut()
.map(LevelHistogram::reset);
}

/// Updates histogram values using provided repetition levels
fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut rep_hist) = self.repetition_level_histogram {
rep_hist.update_from_levels(levels);
}
}

/// Updates histogram values using provided definition levels
fn update_definition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut def_hist) = self.definition_level_histogram {
def_hist.update_from_levels(levels);
}
}
}

// Metrics per column writer
Expand All @@ -206,13 +254,50 @@ struct ColumnMetrics<T: Default> {
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl<T: Default> ColumnMetrics<T> {
fn new() -> Self {
Default::default()
}

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sum `page_histogram` into `chunk_histogram`
fn update_histogram(
chunk_histogram: &mut Option<LevelHistogram>,
page_histogram: &Option<LevelHistogram>,
) {
if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
chunk_hist.add(page_hist);
}
}

/// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
/// page histograms are not initialized.
fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
ColumnMetrics::<T>::update_histogram(
&mut self.definition_level_histogram,
&page_metrics.definition_level_histogram,
);
ColumnMetrics::<T>::update_histogram(
&mut self.repetition_level_histogram,
&page_metrics.repetition_level_histogram,
);
}

/// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
if let Some(var_bytes) = variable_length_bytes {
Expand Down Expand Up @@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);

let mut page_metrics = PageMetrics::new();
let mut column_metrics = ColumnMetrics::<E::T>::new();

// Initialize level histograms if collecting page or chunk statistics
if statistics_enabled != EnabledStatistics::None {
page_metrics = page_metrics
.with_repetition_level_histogram(descr.max_rep_level())
.with_definition_level_histogram(descr.max_def_level());
column_metrics = column_metrics
.with_repetition_level_histogram(descr.max_rep_level())
.with_definition_level_histogram(descr.max_def_level())
}

// Disable column_index_builder if not collecting page statistics.
let mut column_index_builder = ColumnIndexBuilder::new();
if statistics_enabled != EnabledStatistics::Page {
Expand All @@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
page_metrics: PageMetrics {
num_buffered_values: 0,
num_buffered_rows: 0,
num_page_nulls: 0,
},
column_metrics: ColumnMetrics::<E::T>::new(),
page_metrics,
column_metrics,
column_index_builder,
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}

// Update histogram
self.page_metrics.update_definition_level_histogram(levels);

self.def_levels_sink.extend_from_slice(levels);
values_to_write
} else {
Expand Down Expand Up @@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.page_metrics.num_buffered_rows += (level == 0) as u32
}

// Update histogram
self.page_metrics.update_repetition_level_histogram(levels);

self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
Expand Down Expand Up @@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}
}
// update the offset index

// Append page histograms to the `ColumnIndex` histograms
self.column_index_builder.append_histograms(
&self.page_metrics.repetition_level_histogram,
&self.page_metrics.definition_level_histogram,
);

// Update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

Expand Down Expand Up @@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_data.variable_length_bytes,
);

// Update variable_length_bytes in column_metrics
// Update histograms and variable_length_bytes in column_metrics
self.column_metrics
.update_from_page_metrics(&self.page_metrics);
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);

Expand Down Expand Up @@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
self.page_metrics = PageMetrics::default();
self.page_metrics.new_page();

Ok(())
}
Expand Down Expand Up @@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

builder = builder
.set_statistics(statistics)
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
.set_repetition_level_histogram(
self.column_metrics.repetition_level_histogram.take(),
)
.set_definition_level_histogram(
self.column_metrics.definition_level_histogram.take(),
);
}

let metadata = builder.build()?;
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl HeapSize for ColumnChunkMetaData {
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ self.unencoded_byte_array_data_bytes.heap_size()
+ self.repetition_level_histogram.heap_size()
+ self.definition_level_histogram.heap_size()
}
}

Expand Down
Loading

0 comments on commit b06ffce

Please sign in to comment.