diff --git a/parquet/regen.sh b/parquet/regen.sh index d1b82108a018..39999c7872cd 100755 --- a/parquet/regen.sh +++ b/parquet/regen.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -REVISION=46cc3a0647d301bb9579ca8dd2cc356caf2a72d2 +REVISION=5b564f3c47679526cf72e54f207013f28f53acc4 SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]:-$0}")" && pwd)" diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index fc37ebfb4510..7d081e0ac479 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -96,6 +96,7 @@ macro_rules! downcast_op { struct FallbackEncoder { encoder: FallbackEncoderImpl, num_values: usize, + variable_length_bytes: i64, } /// The fallback encoder in use @@ -152,6 +153,7 @@ impl FallbackEncoder { Ok(Self { encoder, num_values: 0, + variable_length_bytes: 0, }) } @@ -168,7 +170,8 @@ impl FallbackEncoder { let value = values.value(*idx); let value = value.as_ref(); buffer.extend_from_slice((value.len() as u32).as_bytes()); - buffer.extend_from_slice(value) + buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::DeltaLength { buffer, lengths } => { @@ -177,6 +180,7 @@ impl FallbackEncoder { let value = value.as_ref(); lengths.put(&[value.len() as i32]).unwrap(); buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::Delta { @@ -205,6 +209,7 @@ impl FallbackEncoder { buffer.extend_from_slice(&value[prefix_length..]); prefix_lengths.put(&[prefix_length as i32]).unwrap(); suffix_lengths.put(&[suffix_length as i32]).unwrap(); + self.variable_length_bytes += value.len() as i64; } } } @@ -269,12 +274,16 @@ impl FallbackEncoder { } }; + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + Ok(DataPageValues { buf: buf.into(), num_values: std::mem::take(&mut self.num_values), encoding, min_value, max_value, + variable_length_bytes, }) } } @@ -321,6 +330,7 @@ impl Storage for ByteArrayStorage { struct DictEncoder { interner: Interner, indices: Vec, + variable_length_bytes: i64, } impl DictEncoder { @@ -336,6 +346,7 @@ impl DictEncoder { let value = values.value(*idx); let interned = self.interner.intern(value.as_ref()); self.indices.push(interned); + self.variable_length_bytes += value.as_ref().len() as i64; } } @@ -384,12 +395,16 @@ impl DictEncoder { self.indices.clear(); + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + DataPageValues { buf: encoder.consume().into(), num_values, encoding: Encoding::RLE_DICTIONARY, min_value, max_value, + variable_length_bytes, } } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index e4205b7ef2ce..5a790fa6aff0 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1538,6 +1538,7 @@ mod tests { vec![row_group_meta], None, Some(vec![offset_index.clone()]), + None, ); let metadata = Arc::new(metadata); diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index b6c8212608b8..9d01c09040de 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -63,6 +63,7 @@ pub struct DataPageValues { pub encoding: Encoding, pub min_value: Option, pub max_value: Option, + pub variable_length_bytes: Option, } /// A generic encoder of [`ColumnValues`] to data and dictionary pages used by @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl { min_value: Option, max_value: Option, bloom_filter: Option, + variable_length_bytes: Option, } impl ColumnValueEncoderImpl { @@ -150,6 +152,10 @@ impl ColumnValueEncoderImpl { update_min(&self.descr, &min, &mut self.min_value); update_max(&self.descr, &max, &mut self.max_value); } + + if let Some(var_bytes) = T::T::variable_length_bytes(slice) { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } } // encode the values into bloom filter if enabled @@ -203,6 +209,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { bloom_filter, min_value: None, max_value: None, + variable_length_bytes: None, }) } @@ -296,6 +303,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { num_values: std::mem::take(&mut self.num_values), min_value: self.min_value.take(), max_value: self.max_value.take(), + variable_length_bytes: self.variable_length_bytes.take(), }) } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 8594e59714dc..050da34586f1 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -183,12 +183,96 @@ pub struct ColumnCloseResult { pub offset_index: Option, } +/// Creates a vector to hold level histogram data. Length will be `max_level + 1`. +/// Because histograms are not necessary when `max_level == 0`, this will return +/// `None` in that case. +fn new_histogram(max_level: i16) -> Option> { + if max_level > 0 { + Some(vec![0; max_level as usize + 1]) + } else { + None + } +} + +/// Sum `page_histogram` into `chunk_histogram` +fn update_histogram(chunk_histogram: &mut Option>, page_histogram: &Option>) { + if page_histogram.is_some() && chunk_histogram.is_some() { + let chunk_hist = chunk_histogram.as_mut().unwrap(); + let page_hist = page_histogram.as_ref().unwrap(); + for i in 0..page_hist.len() { + chunk_hist[i] += page_hist[i] + } + } +} + // Metrics per page -#[derive(Default)] struct PageMetrics { num_buffered_values: u32, num_buffered_rows: u32, num_page_nulls: u64, + repetition_level_histogram: Option>, + definition_level_histogram: Option>, +} + +impl PageMetrics { + fn new() -> Self { + PageMetrics { + num_buffered_values: 0, + num_buffered_rows: 0, + num_page_nulls: 0, + repetition_level_histogram: None, + definition_level_histogram: None, + } + } + + /// Initialize the repetition level histogram + fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { + self.repetition_level_histogram = new_histogram(max_level); + self + } + + /// Initialize the definition level histogram + fn with_definition_level_histogram(mut self, max_level: i16) -> Self { + self.definition_level_histogram = new_histogram(max_level); + self + } + + /// Sets all elements of `histogram` to 0 + fn reset_histogram(histogram: &mut Option>) { + if let Some(ref mut hist) = histogram { + for v in hist { + *v = 0 + } + } + } + + /// Resets the state of this `PageMetrics` to the initial state. + /// If histograms have been initialized their contents will be reset to zero. + fn new_page(&mut self) { + self.num_buffered_values = 0; + self.num_buffered_rows = 0; + self.num_page_nulls = 0; + PageMetrics::reset_histogram(&mut self.repetition_level_histogram); + PageMetrics::reset_histogram(&mut self.definition_level_histogram); + } + + /// Updates histogram values using provided repetition levels + fn update_repetition_level_histogram(&mut self, levels: &[i16]) { + if let Some(ref mut rep_hist) = self.repetition_level_histogram { + for &level in levels { + rep_hist[level as usize] += 1; + } + } + } + + /// Updates histogram values using provided definition levels + fn update_definition_level_histogram(&mut self, levels: &[i16]) { + if let Some(ref mut def_hist) = self.definition_level_histogram { + for &level in levels { + def_hist[level as usize] += 1; + } + } + } } // Metrics per column writer @@ -204,6 +288,62 @@ struct ColumnMetrics { max_column_value: Option, num_column_nulls: u64, column_distinct_count: Option, + variable_length_bytes: Option, + repetition_level_histogram: Option>, + definition_level_histogram: Option>, +} + +impl ColumnMetrics { + fn new() -> Self { + ColumnMetrics { + total_bytes_written: 0, + total_rows_written: 0, + total_uncompressed_size: 0, + total_compressed_size: 0, + total_num_values: 0, + dictionary_page_offset: None, + data_page_offset: None, + min_column_value: None, + max_column_value: None, + num_column_nulls: 0, + column_distinct_count: None, + variable_length_bytes: None, + repetition_level_histogram: None, + definition_level_histogram: None, + } + } + + /// Initialize the repetition level histogram + fn with_repetition_level_histogram(mut self, max_level: i16) -> Self { + self.repetition_level_histogram = new_histogram(max_level); + self + } + + /// Initialize the definition level histogram + fn with_definition_level_histogram(mut self, max_level: i16) -> Self { + self.definition_level_histogram = new_histogram(max_level); + self + } + + /// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if + /// page histograms are not initialized. + fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) { + update_histogram( + &mut self.definition_level_histogram, + &page_metrics.definition_level_histogram, + ); + update_histogram( + &mut self.repetition_level_histogram, + &page_metrics.repetition_level_histogram, + ); + } + + /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes + fn update_variable_length_bytes(&mut self, variable_length_bytes: &Option) { + if let Some(var_bytes) = variable_length_bytes { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } + } } /// Typed column writer for a primitive column. @@ -260,6 +400,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Used for level information encodings.insert(Encoding::RLE); + let mut page_metrics = PageMetrics::new(); + let mut column_metrics = ColumnMetrics::::new(); + + // Initialize level histograms if collecting page or chunk statistics + if statistics_enabled != EnabledStatistics::None { + page_metrics = page_metrics + .with_repetition_level_histogram(descr.max_rep_level()) + .with_definition_level_histogram(descr.max_def_level()); + column_metrics = column_metrics + .with_repetition_level_histogram(descr.max_rep_level()) + .with_definition_level_histogram(descr.max_def_level()) + } + Self { descr, props, @@ -271,24 +424,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), - page_metrics: PageMetrics { - num_buffered_values: 0, - num_buffered_rows: 0, - num_page_nulls: 0, - }, - column_metrics: ColumnMetrics { - total_bytes_written: 0, - total_rows_written: 0, - total_uncompressed_size: 0, - total_compressed_size: 0, - total_num_values: 0, - dictionary_page_offset: None, - data_page_offset: None, - min_column_value: None, - max_column_value: None, - num_column_nulls: 0, - column_distinct_count: None, - }, + page_metrics, + column_metrics, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -538,6 +675,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } + // Update histogram + self.page_metrics.update_definition_level_histogram(levels); + self.def_levels_sink.extend_from_slice(levels); values_to_write } else { @@ -566,6 +706,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.page_metrics.num_buffered_rows += (level == 0) as u32 } + // Update histogram + self.page_metrics.update_repetition_level_histogram(levels); + self.rep_levels_sink.extend_from_slice(levels); } else { // Each value is exactly one row. @@ -634,7 +777,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } /// Update the column index and offset index when adding the data page - fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics>) { + fn update_column_offset_index( + &mut self, + page_statistics: Option<&ValueStatistics>, + page_variable_length_bytes: Option, + ) { // update the column index let null_page = (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; @@ -705,9 +852,21 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } } + + // update histograms + if self.column_index_builder.valid() { + self.column_index_builder.append_histograms( + &self.page_metrics.repetition_level_histogram, + &self.page_metrics.definition_level_histogram, + ); + } + // update the offset index self.offset_index_builder .append_row_count(self.page_metrics.num_buffered_rows as i64); + + self.offset_index_builder + .append_unencoded_byte_array_data_bytes(page_variable_length_bytes); } /// Determine if we should allow truncating min/max values for this column's statistics @@ -783,7 +942,17 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; // update column and offset index - self.update_column_offset_index(page_statistics.as_ref()); + self.update_column_offset_index( + page_statistics.as_ref(), + values_data.variable_length_bytes, + ); + + // Update histograms and variable_length_bytes in column_metrics + self.column_metrics + .update_from_page_metrics(&self.page_metrics); + self.column_metrics + .update_variable_length_bytes(&values_data.variable_length_bytes); + let page_statistics = page_statistics.map(Statistics::from); let compressed_page = match self.props.writer_version() { @@ -887,7 +1056,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Reset state. self.rep_levels_sink.clear(); self.def_levels_sink.clear(); - self.page_metrics = PageMetrics::default(); + self.page_metrics.new_page(); Ok(()) } @@ -993,7 +1162,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { stats => stats, }; - builder = builder.set_statistics(statistics); + builder = builder + .set_statistics(statistics) + .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes) + .set_repetition_level_histogram( + self.column_metrics.repetition_level_histogram.take(), + ) + .set_definition_level_histogram( + self.column_metrics.definition_level_histogram.take(), + ); } let metadata = builder.build()?; diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index b85a75cfd410..01e92115c45b 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -644,6 +644,13 @@ pub(crate) mod private { (std::mem::size_of::(), 1) } + /// Return the number of variable length bytes in a given slice of data + /// + /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types + fn variable_length_bytes(_: &[Self]) -> Option { + None + } + /// Return the value as i64 if possible /// /// This is essentially the same as `std::convert::TryInto` but can't be @@ -956,6 +963,10 @@ pub(crate) mod private { Ok(num_values) } + fn variable_length_bytes(values: &[Self]) -> Option { + Some(values.iter().map(|x| x.len() as i64).sum()) + } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { let data = decoder .data diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 57b2f7eec0c2..22ee228bcf40 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -97,6 +97,9 @@ impl HeapSize for ColumnChunkMetaData { + self.compression.heap_size() + self.statistics.heap_size() + self.encoding_stats.heap_size() + + self.unencoded_byte_array_data_bytes.heap_size() + + self.repetition_level_histogram.heap_size() + + self.definition_level_histogram.heap_size() } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 40922d52bfd4..e2afff24ae64 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -36,7 +36,7 @@ use std::sync::Arc; use crate::format::{ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, - SortingColumn, + SizeStatistics, SortingColumn, }; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; @@ -96,6 +96,8 @@ pub struct ParquetMetaData { column_index: Option, /// Offset index for all each page in each column chunk offset_index: Option, + /// `unencoded_byte_array_data_bytes` from the offset index + unencoded_byte_array_data_bytes: Option>>>>, } impl ParquetMetaData { @@ -107,6 +109,7 @@ impl ParquetMetaData { row_groups, column_index: None, offset_index: None, + unencoded_byte_array_data_bytes: None, } } @@ -117,12 +120,14 @@ impl ParquetMetaData { row_groups: Vec, column_index: Option, offset_index: Option, + unencoded_byte_array_data_bytes: Option>>>>, ) -> Self { ParquetMetaData { file_metadata, row_groups, column_index, offset_index, + unencoded_byte_array_data_bytes, } } @@ -179,6 +184,16 @@ impl ParquetMetaData { self.offset_index.as_ref() } + /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in this file, if loaded + /// + /// Returns `None` if the parquet file does not have a `OffsetIndex` or + /// [ArrowReaderOptions::with_page_index] was set to false. + /// + /// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index + pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec>>>> { + self.unencoded_byte_array_data_bytes.as_ref() + } + /// Estimate of the bytes allocated to store `ParquetMetadata` /// /// # Notes: @@ -199,6 +214,7 @@ impl ParquetMetaData { + self.row_groups.heap_size() + self.column_index.heap_size() + self.offset_index.heap_size() + + self.unencoded_byte_array_data_bytes.heap_size() } /// Override the column index @@ -538,6 +554,9 @@ pub struct ColumnChunkMetaData { offset_index_length: Option, column_index_offset: Option, column_index_length: Option, + unencoded_byte_array_data_bytes: Option, + repetition_level_histogram: Option>, + definition_level_histogram: Option>, } /// Represents common operations for a column chunk. @@ -690,6 +709,32 @@ impl ColumnChunkMetaData { Some(offset..(offset + length)) } + /// Returns the number of bytes of variable length data after decoding. + /// + /// Only set for BYTE_ARRAY columns. This field may not be set by older + /// writers. + pub fn unencoded_byte_array_data_bytes(&self) -> Option { + self.unencoded_byte_array_data_bytes + } + + /// Returns the repetition level histogram. + /// + /// The returned value `vec[i]` is how many values are at repetition level `i`. For example, + /// `vec[0]` indicates how many rows the page contains. + /// This field may not be set by older writers. + pub fn repetition_level_histogram(&self) -> Option<&Vec> { + self.repetition_level_histogram.as_ref() + } + + /// Returns the definition level histogram. + /// + /// The returned value `vec[i]` is how many values are at definition level `i`. For example, + /// `vec[max_definition_level-1]` indicates how many non-null values are present in the page. + /// This field may not be set by older writers. + pub fn definition_level_histogram(&self) -> Option<&Vec> { + self.definition_level_histogram.as_ref() + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -727,6 +772,19 @@ impl ColumnChunkMetaData { let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; let column_index_length = cc.column_index_length; + let ( + unencoded_byte_array_data_bytes, + repetition_level_histogram, + definition_level_histogram, + ) = if let Some(size_stats) = col_metadata.size_statistics { + ( + size_stats.unencoded_byte_array_data_bytes, + size_stats.repetition_level_histogram, + size_stats.definition_level_histogram, + ) + } else { + (None, None, None) + }; let result = ColumnChunkMetaData { column_descr, @@ -748,6 +806,9 @@ impl ColumnChunkMetaData { offset_index_length, column_index_offset, column_index_length, + unencoded_byte_array_data_bytes, + repetition_level_histogram, + definition_level_histogram, }; Ok(result) } @@ -771,6 +832,19 @@ impl ColumnChunkMetaData { /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { + let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() + || self.repetition_level_histogram.is_some() + || self.definition_level_histogram.is_some() + { + Some(SizeStatistics { + unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, + repetition_level_histogram: self.repetition_level_histogram.clone(), + definition_level_histogram: self.definition_level_histogram.clone(), + }) + } else { + None + }; + ColumnMetaData { type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), @@ -790,6 +864,7 @@ impl ColumnChunkMetaData { .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), bloom_filter_offset: self.bloom_filter_offset, bloom_filter_length: self.bloom_filter_length, + size_statistics, } } @@ -825,6 +900,9 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: None, column_index_offset: None, column_index_length: None, + unencoded_byte_array_data_bytes: None, + repetition_level_histogram: None, + definition_level_histogram: None, }) } @@ -936,6 +1014,24 @@ impl ColumnChunkMetaDataBuilder { self } + /// Sets optional length of variable length data in bytes. + pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option) -> Self { + self.0.unencoded_byte_array_data_bytes = value; + self + } + + /// Sets optional repetition level histogram + pub fn set_repetition_level_histogram(mut self, value: Option>) -> Self { + self.0.repetition_level_histogram = value; + self + } + + /// Sets optional repetition level histogram + pub fn set_definition_level_histogram(mut self, value: Option>) -> Self { + self.0.definition_level_histogram = value; + self + } + /// Builds column chunk metadata. pub fn build(self) -> Result { Ok(self.0) @@ -949,6 +1045,8 @@ pub struct ColumnIndexBuilder { max_values: Vec>, null_counts: Vec, boundary_order: BoundaryOrder, + repetition_level_histograms: Option>, + definition_level_histograms: Option>, // If one page can't get build index, need to ignore all index in this column valid: bool, } @@ -967,6 +1065,8 @@ impl ColumnIndexBuilder { max_values: Vec::new(), null_counts: Vec::new(), boundary_order: BoundaryOrder::UNORDERED, + repetition_level_histograms: None, + definition_level_histograms: None, valid: true, } } @@ -984,6 +1084,23 @@ impl ColumnIndexBuilder { self.null_counts.push(null_count); } + pub fn append_histograms( + &mut self, + repetition_level_histogram: &Option>, + definition_level_histogram: &Option>, + ) { + if let Some(ref rep_lvl_hist) = repetition_level_histogram { + let hist = self.repetition_level_histograms.get_or_insert(Vec::new()); + hist.reserve(rep_lvl_hist.len()); + hist.extend(rep_lvl_hist); + } + if let Some(ref def_lvl_hist) = definition_level_histogram { + let hist = self.definition_level_histograms.get_or_insert(Vec::new()); + hist.reserve(def_lvl_hist.len()); + hist.extend(def_lvl_hist); + } + } + pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { self.boundary_order = boundary_order; } @@ -1004,6 +1121,8 @@ impl ColumnIndexBuilder { self.max_values, self.boundary_order, self.null_counts, + self.repetition_level_histograms, + self.definition_level_histograms, ) } } @@ -1013,6 +1132,7 @@ pub struct OffsetIndexBuilder { offset_array: Vec, compressed_page_size_array: Vec, first_row_index_array: Vec, + unencoded_byte_array_data_bytes_array: Option>, current_first_row_index: i64, } @@ -1028,6 +1148,7 @@ impl OffsetIndexBuilder { offset_array: Vec::new(), compressed_page_size_array: Vec::new(), first_row_index_array: Vec::new(), + unencoded_byte_array_data_bytes_array: None, current_first_row_index: 0, } } @@ -1043,6 +1164,17 @@ impl OffsetIndexBuilder { self.compressed_page_size_array.push(compressed_page_size); } + pub fn append_unencoded_byte_array_data_bytes( + &mut self, + unencoded_byte_array_data_bytes: Option, + ) { + if let Some(val) = unencoded_byte_array_data_bytes { + self.unencoded_byte_array_data_bytes_array + .get_or_insert(Vec::new()) + .push(val); + } + } + /// Build and get the thrift metadata of offset index pub fn build_to_thrift(self) -> OffsetIndex { let locations = self @@ -1052,7 +1184,7 @@ impl OffsetIndexBuilder { .zip(self.first_row_index_array.iter()) .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index)) .collect::>(); - OffsetIndex::new(locations) + OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) } } @@ -1203,6 +1335,9 @@ mod tests { .set_offset_index_length(Some(25)) .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) + .set_unencoded_byte_array_data_bytes(Some(2000)) + .set_repetition_level_histogram(Some(vec![100, 100])) + .set_definition_level_histogram(Some(vec![0, 200])) .build() .unwrap(); @@ -1290,7 +1425,7 @@ mod tests { column_orders, ); let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta.clone()); - let base_expected_size = 1320; + let base_expected_size = 1472; assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(); @@ -1307,9 +1442,10 @@ mod tests { vec![PageLocation::new(1, 2, 3)], vec![PageLocation::new(1, 2, 3)], ]]), + Some(vec![vec![Some(vec![10, 20, 30])]]), ); - let bigger_expected_size = 2304; + let bigger_expected_size = 2848; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index 71fb47afa960..072e311d7623 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -41,6 +41,22 @@ pub struct PageIndex { pub max: Option, /// Null values in the page pub null_count: Option, + + // NOTE: histograms could be stored in NativeIndex, and these could then be + // slices of the histogram vector. That will make conversion back to thrift much easier. + // Nevermind...this would require propagating lifetimes up the metadata tree. + + /// Repetition level histogram for the page + /// + /// `repetition_level_histogram[i]` is a count of how many values are at repetition level `i`. + /// For example, `repetition_level_histogram[0]` indicates how many rows the page contains. + pub repetition_level_histogram: Option>, + /// Definition level histogram for the page + /// + /// `definition_level_histogram[i]` is a count of how many values are at definition level `i`. + /// For example, `definition_level_histogram[max_definition_level-1]` indicates how many + /// non-null values are present in the page. + pub definition_level_histogram: Option>, } impl PageIndex { @@ -53,6 +69,12 @@ impl PageIndex { pub fn null_count(&self) -> Option { self.null_count } + pub fn repetition_level_histogram(&self) -> Option<&Vec> { + self.repetition_level_histogram.as_ref() + } + pub fn definition_level_histogram(&self) -> Option<&Vec> { + self.definition_level_histogram.as_ref() + } } impl PageIndex @@ -141,26 +163,57 @@ impl NativeIndex { .map(|x| x.into_iter().map(Some).collect::>()) .unwrap_or_else(|| vec![None; len]); + // histograms are a 1D array encoding a 2D num_pages X num_levels matrix. + let to_page_histograms = |opt_hist: Option>| { + if let Some(hist) = opt_hist { + // TODO: should we assert (hist.len() % len) == 0? + let num_levels = hist.len() / len; + let mut res = Vec::with_capacity(len); + for i in 0..len { + let page_idx = i * num_levels; + let page_hist = hist[page_idx..page_idx + num_levels].to_vec(); + res.push(Some(page_hist)); + } + res + } else { + vec![None; len] + } + }; + + let rep_hists: Vec>> = + to_page_histograms(index.repetition_level_histograms); + let def_hists: Vec>> = + to_page_histograms(index.definition_level_histograms); + let indexes = index .min_values .iter() .zip(index.max_values.into_iter()) .zip(index.null_pages.into_iter()) .zip(null_counts.into_iter()) - .map(|(((min, max), is_null), null_count)| { - let (min, max) = if is_null { - (None, None) - } else { - let min = min.as_slice(); - let max = max.as_slice(); - (Some(from_le_slice::(min)), Some(from_le_slice::(max))) - }; - Ok(PageIndex { - min, - max, - null_count, - }) - }) + .zip(rep_hists.into_iter()) + .zip(def_hists.into_iter()) + .map( + |( + ((((min, max), is_null), null_count), repetition_level_histogram), + definition_level_histogram, + )| { + let (min, max) = if is_null { + (None, None) + } else { + let min = min.as_slice(); + let max = max.as_slice(); + (Some(from_le_slice::(min)), Some(from_le_slice::(max))) + }; + Ok(PageIndex { + min, + max, + null_count, + repetition_level_histogram, + definition_level_histogram, + }) + }, + ) .collect::, ParquetError>>()?; Ok(Self { @@ -180,6 +233,8 @@ mod tests { min: Some(-123), max: Some(234), null_count: Some(0), + repetition_level_histogram: Some(vec![1, 2]), + definition_level_histogram: Some(vec![1, 2, 3]), }; assert_eq!(page_index.min().unwrap(), &-123); @@ -187,6 +242,11 @@ mod tests { assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes()); assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes()); assert_eq!(page_index.null_count().unwrap(), 0); + assert_eq!(page_index.repetition_level_histogram(), Some(&vec![1, 2])); + assert_eq!( + page_index.definition_level_histogram(), + Some(&vec![1, 2, 3]) + ); } #[test] @@ -195,6 +255,8 @@ mod tests { min: None, max: None, null_count: None, + repetition_level_histogram: None, + definition_level_histogram: None, }; assert_eq!(page_index.min(), None); @@ -202,5 +264,7 @@ mod tests { assert_eq!(page_index.min_bytes(), None); assert_eq!(page_index.max_bytes(), None); assert_eq!(page_index.null_count(), None); + assert_eq!(page_index.repetition_level_histogram(), None); + assert_eq!(page_index.definition_level_histogram(), None); } } diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 2ddf826fb022..7358c9626b03 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -22,6 +22,7 @@ use crate::data_type::Int96; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; use crate::file::page_index::index::{Index, NativeIndex}; +use crate::file::page_index::offset_index::ParquetOffsetIndex; use crate::file::reader::ChunkReader; use crate::format::{ColumnIndex, OffsetIndex, PageLocation}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; @@ -45,9 +46,9 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Opt /// Returns an empty vector if this row group does not contain a /// [`ColumnIndex`]. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn read_columns_indexes( reader: &R, chunks: &[ColumnChunkMetaData], @@ -81,9 +82,9 @@ pub fn read_columns_indexes( /// Return an empty vector if this row group does not contain an /// [`OffsetIndex]`. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn read_pages_locations( reader: &R, chunks: &[ColumnChunkMetaData], @@ -109,6 +110,48 @@ pub fn read_pages_locations( .collect() } +/// Reads per-column [`ParquetOffsetIndex`] for all columns of a row group by +/// decoding [`OffsetIndex`] . +/// +/// Returns a vector of `index[column_number]`. +/// +/// Returns an empty vector if this row group does not contain an +/// [`OffsetIndex`]. +/// +/// See [Page Index Documentation] for more details. +/// +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +pub fn read_offset_indexes( + reader: &R, + chunks: &[ColumnChunkMetaData], +) -> Result, ParquetError> { + let fetch = chunks + .iter() + .fold(None, |range, c| acc_range(range, c.offset_index_range())); + + let fetch = match fetch { + Some(r) => r, + None => return Ok(vec![]), + }; + + let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; + let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + + chunks + .iter() + .map(|c| match c.offset_index_range() { + Some(r) => decode_full_offset_index(get(r)), + None => Err(general_err!("missing offset index")), + }) + .collect() +} + +pub(crate) fn decode_full_offset_index(data: &[u8]) -> Result { + let mut prot = TCompactSliceInputProtocol::new(data); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + ParquetOffsetIndex::try_new(offset) +} + pub(crate) fn decode_offset_index(data: &[u8]) -> Result, ParquetError> { let mut prot = TCompactSliceInputProtocol::new(data); let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index 9372645d76ee..a8077896db34 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -21,3 +21,4 @@ pub mod index; pub mod index_reader; +pub mod offset_index; diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs new file mode 100644 index 000000000000..aefaab5f91fc --- /dev/null +++ b/parquet/src/file/page_index/offset_index.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ParquetOffsetIndex`] structure holding decoded [`OffsetIndex`] information + +use crate::errors::ParquetError; +use crate::format::{OffsetIndex, PageLocation}; + +/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes for each page +/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY columns. +#[derive(Debug, Clone, PartialEq)] +pub struct ParquetOffsetIndex { + pub page_locations: Vec, + pub unencoded_byte_array_data_bytes: Option>, +} + +impl ParquetOffsetIndex { + /// Creates a new [`ParquetOffsetIndex`] from an [`OffsetIndex`]. + pub(crate) fn try_new(index: OffsetIndex) -> Result { + Ok(Self { + page_locations: index.page_locations, + unencoded_byte_array_data_bytes: index.unencoded_byte_array_data_bytes, + }) + } + + /// Vector of [`PageLocation`] objects, one per page in the chunk. + pub fn page_locations(&self) -> &Vec { + &self.page_locations + } + + /// Optional vector of unencoded page sizes, one per page in the chunk. Only defined + /// for BYTE_ARRAY columns. + pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec> { + self.unencoded_byte_array_data_bytes.as_ref() + } + + // FIXME(ets): need to add a `to_offset_index` method to convert back to thrift +} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ac7d2d287488..65b6ebf2ec98 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -211,12 +211,22 @@ impl SerializedFileReader { if options.enable_page_index { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; + let mut unenc_byte_sizes = vec![]; for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; - let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; + + // split offset_index into two vectors to not break API + let mut page_locations = vec![]; + let mut unenc_bytes = vec![]; + offset_index.into_iter().for_each(|index| { + page_locations.push(index.page_locations); + unenc_bytes.push(index.unencoded_byte_array_data_bytes); + }); columns_indexes.push(column_index); - offset_indexes.push(offset_index); + offset_indexes.push(page_locations); + unenc_byte_sizes.push(unenc_bytes); } Ok(Self { @@ -226,6 +236,7 @@ impl SerializedFileReader { filtered_row_groups, Some(columns_indexes), Some(offset_indexes), + Some(unenc_byte_sizes), )), props: Arc::new(options.props), }) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7806384cdb52..232b197e51d5 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -636,8 +636,15 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_uncompressed_size(metadata.uncompressed_size()) .set_num_values(metadata.num_values()) .set_data_page_offset(map_offset(src_data_offset)) - .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); + .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)) + .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes()); + if let Some(rep_hist) = metadata.repetition_level_histogram() { + builder = builder.set_repetition_level_histogram(Some(rep_hist.clone())) + } + if let Some(def_hist) = metadata.definition_level_histogram() { + builder = builder.set_definition_level_histogram(Some(def_hist.clone())) + } if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) } @@ -804,7 +811,7 @@ mod tests { use crate::column::page::{Page, PageReader}; use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; - use crate::data_type::{BoolType, Int32Type}; + use crate::data_type::{BoolType, ByteArrayType, Int32Type}; use crate::file::page_index::index::Index; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; @@ -817,6 +824,7 @@ mod tests { use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescriptor, ColumnPath}; + use crate::util::test_common::rand_gen::RandGen; #[test] fn test_row_group_writer_error_not_all_columns_written() { @@ -1828,4 +1836,222 @@ mod tests { let b_idx = &column_index[0][1]; assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } + + #[test] + fn test_byte_array_size_statistics() { + let message_type = " + message test_schema { + OPTIONAL BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = ByteArrayType::gen_vec(32, 7); + let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1]; + let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum(); + let file: File = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + let mut row_group_writer = writer.next_row_group().unwrap(); + + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), None) + .unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + assert_eq!(file_metadata.row_groups[0].columns.len(), 1); + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + + let check_def_hist = |def_hist: &Vec| { + assert_eq!(def_hist.len(), 2); + assert_eq!(def_hist[0], 3); + assert_eq!(def_hist[1], 7); + }; + + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let meta_data = file_metadata.row_groups[0].columns[0] + .meta_data + .as_ref() + .unwrap(); + assert!(meta_data.size_statistics.is_some()); + let size_stats = meta_data.size_statistics.as_ref().unwrap(); + + assert!(size_stats.repetition_level_histogram.is_none()); + assert!(size_stats.definition_level_histogram.is_some()); + assert!(size_stats.unencoded_byte_array_data_bytes.is_some()); + assert_eq!( + unenc_size, + size_stats.unencoded_byte_array_data_bytes.unwrap() + ); + check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap()); + + // check that the read metadata is also correct + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + let rfile_metadata = reader.metadata().file_metadata(); + assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows); + assert_eq!(reader.num_row_groups(), 1); + let rowgroup = reader.get_row_group(0).unwrap(); + assert_eq!(rowgroup.num_columns(), 1); + let column = rowgroup.metadata().column(0); + assert!(column.definition_level_histogram().is_some()); + assert!(column.repetition_level_histogram().is_none()); + assert!(column.unencoded_byte_array_data_bytes().is_some()); + check_def_hist(column.definition_level_histogram().unwrap()); + assert_eq!( + unenc_size, + column.unencoded_byte_array_data_bytes().unwrap() + ); + + // check histogram in column index as well + assert!(reader.metadata().column_index().is_some()); + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); + assert_eq!(column_index[0].len(), 1); + let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] { + assert_eq!(index.indexes.len(), 1); + &index.indexes[0] + } else { + unreachable!() + }; + + assert!(col_idx.repetition_level_histogram().is_none()); + assert!(col_idx.definition_level_histogram().is_some()); + check_def_hist(col_idx.definition_level_histogram().unwrap()); + + assert!(reader + .metadata() + .unencoded_byte_array_data_bytes() + .is_some()); + let unenc_sizes = reader.metadata().unencoded_byte_array_data_bytes().unwrap(); + assert_eq!(unenc_sizes.len(), 1); + assert_eq!(unenc_sizes[0].len(), 1); + assert!(unenc_sizes[0][0].is_some()); + let page_sizes = unenc_sizes[0][0].as_ref().unwrap(); + assert_eq!(page_sizes.len(), 1); + assert_eq!(page_sizes[0], unenc_size); + } + + #[test] + fn test_size_statistics_with_repetition_and_nulls() { + let message_type = " + message test_schema { + OPTIONAL group i32_list (LIST) { + REPEATED group list { + OPTIONAL INT32 element; + } + } + } + "; + // column is: + // row 0: [1, 2] + // row 1: NULL + // row 2: [4, NULL] + // row 3: [] + // row 4: [7, 8, 9, 10] + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = [1, 2, 4, 7, 8, 9, 10]; + let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3]; + let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1]; + let file = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + let mut row_group_writer = writer.next_row_group().unwrap(); + + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), Some(&rep_levels)) + .unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + assert_eq!(file_metadata.row_groups[0].columns.len(), 1); + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + + let check_def_hist = |def_hist: &Vec| { + assert_eq!(def_hist.len(), 4); + assert_eq!(def_hist[0], 1); + assert_eq!(def_hist[1], 1); + assert_eq!(def_hist[2], 1); + assert_eq!(def_hist[3], 7); + }; + + let check_rep_hist = |rep_hist: &Vec| { + assert_eq!(rep_hist.len(), 2); + assert_eq!(rep_hist[0], 5); + assert_eq!(rep_hist[1], 5); + }; + + // check that histograms are set properly in the write and read metadata + // also check that unencoded_byte_array_data_bytes is not set + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let meta_data = file_metadata.row_groups[0].columns[0] + .meta_data + .as_ref() + .unwrap(); + assert!(meta_data.size_statistics.is_some()); + let size_stats = meta_data.size_statistics.as_ref().unwrap(); + assert!(size_stats.repetition_level_histogram.is_some()); + assert!(size_stats.definition_level_histogram.is_some()); + assert!(size_stats.unencoded_byte_array_data_bytes.is_none()); + check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap()); + check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap()); + + // check that the read metadata is also correct + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + let rfile_metadata = reader.metadata().file_metadata(); + assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows); + assert_eq!(reader.num_row_groups(), 1); + let rowgroup = reader.get_row_group(0).unwrap(); + assert_eq!(rowgroup.num_columns(), 1); + let column = rowgroup.metadata().column(0); + assert!(column.definition_level_histogram().is_some()); + assert!(column.repetition_level_histogram().is_some()); + assert!(column.unencoded_byte_array_data_bytes().is_none()); + check_def_hist(column.definition_level_histogram().unwrap()); + check_rep_hist(column.repetition_level_histogram().unwrap()); + + // check histogram in column index as well + assert!(reader.metadata().column_index().is_some()); + let column_index = reader.metadata().column_index().unwrap(); + assert_eq!(column_index.len(), 1); + assert_eq!(column_index[0].len(), 1); + let col_idx = if let Index::INT32(index) = &column_index[0][0] { + assert_eq!(index.indexes.len(), 1); + &index.indexes[0] + } else { + unreachable!() + }; + + check_def_hist(col_idx.definition_level_histogram().unwrap()); + check_rep_hist(col_idx.repetition_level_histogram().unwrap()); + + assert!(reader + .metadata() + .unencoded_byte_array_data_bytes() + .is_some()); + let unenc_sizes = reader.metadata().unencoded_byte_array_data_bytes().unwrap(); + assert_eq!(unenc_sizes.len(), 1); + assert_eq!(unenc_sizes[0].len(), 1); + assert!(unenc_sizes[0][0].is_none()); + } } diff --git a/parquet/src/format.rs b/parquet/src/format.rs index b210d6ec1b7e..6c93097b7359 100644 --- a/parquet/src/format.rs +++ b/parquet/src/format.rs @@ -117,12 +117,12 @@ impl ConvertedType { /// a list is converted into an optional field containing a repeated field for its /// values pub const LIST: ConvertedType = ConvertedType(3); - /// an enum is converted into a binary field + /// an enum is converted into a BYTE_ARRAY field pub const ENUM: ConvertedType = ConvertedType(4); /// A decimal value. /// - /// This may be used to annotate binary or fixed primitive types. The - /// underlying byte array stores the unscaled value encoded as two's + /// This may be used to annotate BYTE_ARRAY or FIXED_LEN_BYTE_ARRAY primitive + /// types. The underlying byte array stores the unscaled value encoded as two's /// complement using big-endian byte order (the most significant byte is the /// zeroth element). The value of the decimal is the value * 10^{-scale}. /// @@ -185,7 +185,7 @@ impl ConvertedType { pub const JSON: ConvertedType = ConvertedType(19); /// An embedded BSON document /// - /// A BSON document embedded within a single BINARY column. + /// A BSON document embedded within a single BYTE_ARRAY column. pub const BSON: ConvertedType = ConvertedType(20); /// An interval of time /// @@ -288,9 +288,9 @@ impl From<&ConvertedType> for i32 { pub struct FieldRepetitionType(pub i32); impl FieldRepetitionType { - /// This field is required (can not be null) and each record has exactly 1 value. + /// This field is required (can not be null) and each row has exactly 1 value. pub const REQUIRED: FieldRepetitionType = FieldRepetitionType(0); - /// The field is optional (can be null) and each record has 0 or 1 values. + /// The field is optional (can be null) and each row has 0 or 1 values. pub const OPTIONAL: FieldRepetitionType = FieldRepetitionType(1); /// The field is repeated and can contain 0 or more values pub const REPEATED: FieldRepetitionType = FieldRepetitionType(2); @@ -379,12 +379,15 @@ impl Encoding { pub const DELTA_BYTE_ARRAY: Encoding = Encoding(7); /// Dictionary encoding: the ids are encoded using the RLE encoding pub const RLE_DICTIONARY: Encoding = Encoding(8); - /// Encoding for floating-point data. + /// Encoding for fixed-width data (FLOAT, DOUBLE, INT32, INT64, FIXED_LEN_BYTE_ARRAY). /// K byte-streams are created where K is the size in bytes of the data type. - /// The individual bytes of an FP value are scattered to the corresponding stream and + /// The individual bytes of a value are scattered to the corresponding stream and /// the streams are concatenated. /// This itself does not reduce the size of the data but can lead to better compression /// afterwards. + /// + /// Added in 2.8 for FLOAT and DOUBLE. + /// Support for INT32, INT64 and FIXED_LEN_BYTE_ARRAY added in 2.11. pub const BYTE_STREAM_SPLIT: Encoding = Encoding(9); pub const ENUM_VALUES: &'static [Self] = &[ Self::PLAIN, @@ -634,6 +637,143 @@ impl From<&BoundaryOrder> for i32 { } } +// +// SizeStatistics +// + +/// A structure for capturing metadata for estimating the unencoded, +/// uncompressed size of data written. This is useful for readers to estimate +/// how much memory is needed to reconstruct data in their memory model and for +/// fine grained filter pushdown on nested structures (the histograms contained +/// in this structure can help determine the number of nulls at a particular +/// nesting level and maximum length of lists). +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct SizeStatistics { + /// The number of physical bytes stored for BYTE_ARRAY data values assuming + /// no encoding. This is exclusive of the bytes needed to store the length of + /// each byte array. In other words, this field is equivalent to the `(size + /// of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + /// written)`. To determine unencoded sizes of other types readers can use + /// schema information multiplied by the number of non-null and null values. + /// The number of null/non-null values can be inferred from the histograms + /// below. + /// + /// For example, if a column chunk is dictionary-encoded with dictionary + /// ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + /// then this value for that data page should be 7 (1 + 1 + 2 + 3). + /// + /// This field should only be set for types that use BYTE_ARRAY as their + /// physical type. + pub unencoded_byte_array_data_bytes: Option, + /// When present, there is expected to be one element corresponding to each + /// repetition (i.e. size=max repetition_level+1) where each element + /// represents the number of times the repetition level was observed in the + /// data. + /// + /// This field may be omitted if max_repetition_level is 0 without loss + /// of information. + /// + pub repetition_level_histogram: Option>, + /// Same as repetition_level_histogram except for definition levels. + /// + /// This field may be omitted if max_definition_level is 0 or 1 without + /// loss of information. + /// + pub definition_level_histogram: Option>, +} + +impl SizeStatistics { + pub fn new(unencoded_byte_array_data_bytes: F1, repetition_level_histogram: F2, definition_level_histogram: F3) -> SizeStatistics where F1: Into>, F2: Into>>, F3: Into>> { + SizeStatistics { + unencoded_byte_array_data_bytes: unencoded_byte_array_data_bytes.into(), + repetition_level_histogram: repetition_level_histogram.into(), + definition_level_histogram: definition_level_histogram.into(), + } + } +} + +impl crate::thrift::TSerializable for SizeStatistics { + fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option> = None; + let mut f_3: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_0 = i_prot.read_i64()?; + val.push(list_elem_0); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, + 3 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_1 = i_prot.read_i64()?; + val.push(list_elem_1); + } + i_prot.read_list_end()?; + f_3 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = SizeStatistics { + unencoded_byte_array_data_bytes: f_1, + repetition_level_histogram: f_2, + definition_level_histogram: f_3, + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut T) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("SizeStatistics"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.unencoded_byte_array_data_bytes { + o_prot.write_field_begin(&TFieldIdentifier::new("unencoded_byte_array_data_bytes", TType::I64, 1))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.repetition_level_histogram { + o_prot.write_field_begin(&TFieldIdentifier::new("repetition_level_histogram", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.definition_level_histogram { + o_prot.write_field_begin(&TFieldIdentifier::new("definition_level_histogram", TType::List, 3))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + // // Statistics // @@ -1123,7 +1263,7 @@ impl crate::thrift::TSerializable for NullType { /// To maintain forward-compatibility in v1, implementations using this logical /// type must also set scale and precision on the annotated SchemaElement. /// -/// Allowed for physical types: INT32, INT64, FIXED, and BINARY +/// Allowed for physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY, and BYTE_ARRAY. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DecimalType { pub scale: i32, @@ -1620,7 +1760,7 @@ impl crate::thrift::TSerializable for IntType { /// Embedded JSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct JsonType { } @@ -1660,7 +1800,7 @@ impl crate::thrift::TSerializable for JsonType { /// Embedded BSON logical type annotation /// -/// Allowed for physical types: BINARY +/// Allowed for physical types: BYTE_ARRAY #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct BsonType { } @@ -2146,7 +2286,12 @@ impl crate::thrift::TSerializable for SchemaElement { /// Data page header #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct DataPageHeader { - /// Number of values, including NULLs, in this data page. * + /// Number of values, including NULLs, in this data page. + /// + /// If a OffsetIndex is present, a page must begin at a row + /// boundary (repetition_level = 0). Otherwise, pages may begin + /// within a row (repetition_level > 0). + /// pub num_values: i32, /// Encoding used for this data page * pub encoding: Encoding, @@ -2154,7 +2299,7 @@ pub struct DataPageHeader { pub definition_level_encoding: Encoding, /// Encoding used for repetition levels * pub repetition_level_encoding: Encoding, - /// Optional statistics for the data in this page* + /// Optional statistics for the data in this page * pub statistics: Option, } @@ -2390,21 +2535,24 @@ pub struct DataPageHeaderV2 { /// Number of NULL values, in this data page. /// Number of non-null = num_values - num_nulls which is also the number of values in the data section * pub num_nulls: i32, - /// Number of rows in this data page. which means pages change on record boundaries (r = 0) * + /// Number of rows in this data page. Every page must begin at a + /// row boundary (repetition_level = 0): rows must **not** be + /// split across page boundaries when using V2 data pages. + /// pub num_rows: i32, /// Encoding used for data in this page * pub encoding: Encoding, - /// length of the definition levels + /// Length of the definition levels pub definition_levels_byte_length: i32, - /// length of the repetition levels + /// Length of the repetition levels pub repetition_levels_byte_length: i32, - /// whether the values are compressed. + /// Whether the values are compressed. /// Which means the section of the page between /// definition_levels_byte_length + repetition_levels_byte_length + 1 and compressed_page_size (included) /// is compressed with the compression_codec. /// If missing it is considered compressed pub is_compressed: Option, - /// optional statistics for the data in this page * + /// Optional statistics for the data in this page * pub statistics: Option, } @@ -3207,10 +3355,10 @@ impl crate::thrift::TSerializable for KeyValue { // SortingColumn // -/// Wrapper struct to specify sort order +/// Sort order within a RowGroup of a leaf column #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SortingColumn { - /// The column index (in this row group) * + /// The ordinal position of the column (in this row group) * pub column_idx: i32, /// If true, indicates this column is sorted in descending order. * pub descending: bool, @@ -3417,10 +3565,15 @@ pub struct ColumnMetaData { /// Writers should write this field so readers can read the bloom filter /// in a single I/O. pub bloom_filter_length: Option, + /// Optional statistics to help estimate total memory when converted to in-memory + /// representations. The histograms contained in these statistics can + /// also be useful in some cases for more fine-grained nullability/list length + /// filter pushdown. + pub size_statistics: Option, } impl ColumnMetaData { - pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14, bloom_filter_length: F15) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into>, F15: Into> { + pub fn new(type_: Type, encodings: Vec, path_in_schema: Vec, codec: CompressionCodec, num_values: i64, total_uncompressed_size: i64, total_compressed_size: i64, key_value_metadata: F8, data_page_offset: i64, index_page_offset: F10, dictionary_page_offset: F11, statistics: F12, encoding_stats: F13, bloom_filter_offset: F14, bloom_filter_length: F15, size_statistics: F16) -> ColumnMetaData where F8: Into>>, F10: Into>, F11: Into>, F12: Into>, F13: Into>>, F14: Into>, F15: Into>, F16: Into> { ColumnMetaData { type_, encodings, @@ -3437,6 +3590,7 @@ impl ColumnMetaData { encoding_stats: encoding_stats.into(), bloom_filter_offset: bloom_filter_offset.into(), bloom_filter_length: bloom_filter_length.into(), + size_statistics: size_statistics.into(), } } } @@ -3459,6 +3613,7 @@ impl crate::thrift::TSerializable for ColumnMetaData { let mut f_13: Option> = None; let mut f_14: Option = None; let mut f_15: Option = None; + let mut f_16: Option = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -3474,8 +3629,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_0 = Encoding::read_from_in_protocol(i_prot)?; - val.push(list_elem_0); + let list_elem_2 = Encoding::read_from_in_protocol(i_prot)?; + val.push(list_elem_2); } i_prot.read_list_end()?; f_2 = Some(val); @@ -3484,8 +3639,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_1 = i_prot.read_string()?; - val.push(list_elem_1); + let list_elem_3 = i_prot.read_string()?; + val.push(list_elem_3); } i_prot.read_list_end()?; f_3 = Some(val); @@ -3510,8 +3665,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_2 = KeyValue::read_from_in_protocol(i_prot)?; - val.push(list_elem_2); + let list_elem_4 = KeyValue::read_from_in_protocol(i_prot)?; + val.push(list_elem_4); } i_prot.read_list_end()?; f_8 = Some(val); @@ -3536,8 +3691,8 @@ impl crate::thrift::TSerializable for ColumnMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_3 = PageEncodingStats::read_from_in_protocol(i_prot)?; - val.push(list_elem_3); + let list_elem_5 = PageEncodingStats::read_from_in_protocol(i_prot)?; + val.push(list_elem_5); } i_prot.read_list_end()?; f_13 = Some(val); @@ -3550,6 +3705,10 @@ impl crate::thrift::TSerializable for ColumnMetaData { let val = i_prot.read_i32()?; f_15 = Some(val); }, + 16 => { + let val = SizeStatistics::read_from_in_protocol(i_prot)?; + f_16 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -3581,6 +3740,7 @@ impl crate::thrift::TSerializable for ColumnMetaData { encoding_stats: f_13, bloom_filter_offset: f_14, bloom_filter_length: f_15, + size_statistics: f_16, }; Ok(ret) } @@ -3662,6 +3822,11 @@ impl crate::thrift::TSerializable for ColumnMetaData { o_prot.write_i32(fld_var)?; o_prot.write_field_end()? } + if let Some(ref fld_var) = self.size_statistics { + o_prot.write_field_begin(&TFieldIdentifier::new("size_statistics", TType::Struct, 16))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -3741,8 +3906,8 @@ impl crate::thrift::TSerializable for EncryptionWithColumnKey { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_4 = i_prot.read_string()?; - val.push(list_elem_4); + let list_elem_6 = i_prot.read_string()?; + val.push(list_elem_6); } i_prot.read_list_end()?; f_1 = Some(val); @@ -3881,11 +4046,19 @@ pub struct ColumnChunk { /// metadata. This path is relative to the current file. /// pub file_path: Option, - /// Byte offset in file_path to the ColumnMetaData * + /// Deprecated: Byte offset in file_path to the ColumnMetaData + /// + /// Past use of this field has been inconsistent, with some implementations + /// using it to point to the ColumnMetaData and some using it to point to + /// the first page in the column chunk. In many cases, the ColumnMetaData at this + /// location is wrong. This field is now deprecated and should not be used. + /// Writers should set this field to 0 if no ColumnMetaData has been written outside + /// the footer. pub file_offset: i64, - /// Column metadata for this chunk. This is the same content as what is at - /// file_path/file_offset. Having it here has it replicated in the file - /// metadata. + /// Column metadata for this chunk. Some writers may also replicate this at the + /// location pointed to by file_path/file_offset. + /// Note: while marked as optional, this field is in fact required by most major + /// Parquet implementations. As such, writers MUST populate this field. /// pub meta_data: Option, /// File offset of ColumnChunk's OffsetIndex * @@ -4107,8 +4280,8 @@ impl crate::thrift::TSerializable for RowGroup { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_5 = ColumnChunk::read_from_in_protocol(i_prot)?; - val.push(list_elem_5); + let list_elem_7 = ColumnChunk::read_from_in_protocol(i_prot)?; + val.push(list_elem_7); } i_prot.read_list_end()?; f_1 = Some(val); @@ -4125,8 +4298,8 @@ impl crate::thrift::TSerializable for RowGroup { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_6 = SortingColumn::read_from_in_protocol(i_prot)?; - val.push(list_elem_6); + let list_elem_8 = SortingColumn::read_from_in_protocol(i_prot)?; + val.push(list_elem_8); } i_prot.read_list_end()?; f_4 = Some(val); @@ -4331,8 +4504,9 @@ pub struct PageLocation { /// Size of the page, including header. Sum of compressed_page_size and header /// length pub compressed_page_size: i32, - /// Index within the RowGroup of the first row of the page; this means pages - /// change on record boundaries (r = 0). + /// Index within the RowGroup of the first row of the page. When an + /// OffsetIndex is present, pages must begin on row boundaries + /// (repetition_level = 0). pub first_row_index: i64, } @@ -4409,17 +4583,28 @@ impl crate::thrift::TSerializable for PageLocation { // OffsetIndex // +/// Optional offsets for each data page in a ColumnChunk. +/// +/// Forms part of the page index, along with ColumnIndex. +/// +/// OffsetIndex may be present even if ColumnIndex is not. #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct OffsetIndex { /// PageLocations, ordered by increasing PageLocation.offset. It is required /// that page_locations\[i\].first_row_index < page_locations\[i+1\].first_row_index. pub page_locations: Vec, + /// Unencoded/uncompressed size for BYTE_ARRAY types. + /// + /// See documention for unencoded_byte_array_data_bytes in SizeStatistics for + /// more details on this field. + pub unencoded_byte_array_data_bytes: Option>, } impl OffsetIndex { - pub fn new(page_locations: Vec) -> OffsetIndex { + pub fn new(page_locations: Vec, unencoded_byte_array_data_bytes: F2) -> OffsetIndex where F2: Into>> { OffsetIndex { page_locations, + unencoded_byte_array_data_bytes: unencoded_byte_array_data_bytes.into(), } } } @@ -4428,6 +4613,7 @@ impl crate::thrift::TSerializable for OffsetIndex { fn read_from_in_protocol(i_prot: &mut T) -> thrift::Result { i_prot.read_struct_begin()?; let mut f_1: Option> = None; + let mut f_2: Option> = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -4439,12 +4625,22 @@ impl crate::thrift::TSerializable for OffsetIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_7 = PageLocation::read_from_in_protocol(i_prot)?; - val.push(list_elem_7); + let list_elem_9 = PageLocation::read_from_in_protocol(i_prot)?; + val.push(list_elem_9); } i_prot.read_list_end()?; f_1 = Some(val); }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_10 = i_prot.read_i64()?; + val.push(list_elem_10); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -4455,6 +4651,7 @@ impl crate::thrift::TSerializable for OffsetIndex { verify_required_field_exists("OffsetIndex.page_locations", &f_1)?; let ret = OffsetIndex { page_locations: f_1.expect("auto-generated code should have checked for presence of required fields"), + unencoded_byte_array_data_bytes: f_2, }; Ok(ret) } @@ -4468,6 +4665,15 @@ impl crate::thrift::TSerializable for OffsetIndex { } o_prot.write_list_end()?; o_prot.write_field_end()?; + if let Some(ref fld_var) = self.unencoded_byte_array_data_bytes { + o_prot.write_field_begin(&TFieldIdentifier::new("unencoded_byte_array_data_bytes", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -4477,8 +4683,14 @@ impl crate::thrift::TSerializable for OffsetIndex { // ColumnIndex // -/// Description for ColumnIndex. -/// Each ``\[i\] refers to the page at OffsetIndex.page_locations\[i\] +/// Optional statistics for each data page in a ColumnChunk. +/// +/// Forms part the page index, along with OffsetIndex. +/// +/// If this structure is present, OffsetIndex must also be present. +/// +/// For each field in this structure, ``\[i\] refers to the page at +/// OffsetIndex.page_locations\[i\] #[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ColumnIndex { /// A list of Boolean values to determine the validity of the corresponding @@ -4504,16 +4716,33 @@ pub struct ColumnIndex { pub boundary_order: BoundaryOrder, /// A list containing the number of null values for each page * pub null_counts: Option>, + /// Contains repetition level histograms for each page + /// concatenated together. The repetition_level_histogram field on + /// SizeStatistics contains more details. + /// + /// When present the length should always be (number of pages * + /// (max_repetition_level + 1)) elements. + /// + /// Element 0 is the first element of the histogram for the first page. + /// Element (max_repetition_level + 1) is the first element of the histogram + /// for the second page. + /// + pub repetition_level_histograms: Option>, + /// Same as repetition_level_histograms except for definitions levels. + /// + pub definition_level_histograms: Option>, } impl ColumnIndex { - pub fn new(null_pages: Vec, min_values: Vec>, max_values: Vec>, boundary_order: BoundaryOrder, null_counts: F5) -> ColumnIndex where F5: Into>> { + pub fn new(null_pages: Vec, min_values: Vec>, max_values: Vec>, boundary_order: BoundaryOrder, null_counts: F5, repetition_level_histograms: F6, definition_level_histograms: F7) -> ColumnIndex where F5: Into>>, F6: Into>>, F7: Into>> { ColumnIndex { null_pages, min_values, max_values, boundary_order, null_counts: null_counts.into(), + repetition_level_histograms: repetition_level_histograms.into(), + definition_level_histograms: definition_level_histograms.into(), } } } @@ -4526,6 +4755,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let mut f_3: Option>> = None; let mut f_4: Option = None; let mut f_5: Option> = None; + let mut f_6: Option> = None; + let mut f_7: Option> = None; loop { let field_ident = i_prot.read_field_begin()?; if field_ident.field_type == TType::Stop { @@ -4537,8 +4768,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_8 = i_prot.read_bool()?; - val.push(list_elem_8); + let list_elem_11 = i_prot.read_bool()?; + val.push(list_elem_11); } i_prot.read_list_end()?; f_1 = Some(val); @@ -4547,8 +4778,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec> = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_9 = i_prot.read_bytes()?; - val.push(list_elem_9); + let list_elem_12 = i_prot.read_bytes()?; + val.push(list_elem_12); } i_prot.read_list_end()?; f_2 = Some(val); @@ -4557,8 +4788,8 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec> = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_10 = i_prot.read_bytes()?; - val.push(list_elem_10); + let list_elem_13 = i_prot.read_bytes()?; + val.push(list_elem_13); } i_prot.read_list_end()?; f_3 = Some(val); @@ -4571,12 +4802,32 @@ impl crate::thrift::TSerializable for ColumnIndex { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_11 = i_prot.read_i64()?; - val.push(list_elem_11); + let list_elem_14 = i_prot.read_i64()?; + val.push(list_elem_14); } i_prot.read_list_end()?; f_5 = Some(val); }, + 6 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_15 = i_prot.read_i64()?; + val.push(list_elem_15); + } + i_prot.read_list_end()?; + f_6 = Some(val); + }, + 7 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_16 = i_prot.read_i64()?; + val.push(list_elem_16); + } + i_prot.read_list_end()?; + f_7 = Some(val); + }, _ => { i_prot.skip(field_ident.field_type)?; }, @@ -4594,6 +4845,8 @@ impl crate::thrift::TSerializable for ColumnIndex { max_values: f_3.expect("auto-generated code should have checked for presence of required fields"), boundary_order: f_4.expect("auto-generated code should have checked for presence of required fields"), null_counts: f_5, + repetition_level_histograms: f_6, + definition_level_histograms: f_7, }; Ok(ret) } @@ -4633,6 +4886,24 @@ impl crate::thrift::TSerializable for ColumnIndex { o_prot.write_list_end()?; o_prot.write_field_end()? } + if let Some(ref fld_var) = self.repetition_level_histograms { + o_prot.write_field_begin(&TFieldIdentifier::new("repetition_level_histograms", TType::List, 6))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } + if let Some(ref fld_var) = self.definition_level_histograms { + o_prot.write_field_begin(&TFieldIdentifier::new("definition_level_histograms", TType::List, 7))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::I64, fld_var.len() as i32))?; + for e in fld_var { + o_prot.write_i64(*e)?; + } + o_prot.write_list_end()?; + o_prot.write_field_end()? + } o_prot.write_field_stop()?; o_prot.write_struct_end() } @@ -4992,8 +5263,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_12 = SchemaElement::read_from_in_protocol(i_prot)?; - val.push(list_elem_12); + let list_elem_17 = SchemaElement::read_from_in_protocol(i_prot)?; + val.push(list_elem_17); } i_prot.read_list_end()?; f_2 = Some(val); @@ -5006,8 +5277,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_13 = RowGroup::read_from_in_protocol(i_prot)?; - val.push(list_elem_13); + let list_elem_18 = RowGroup::read_from_in_protocol(i_prot)?; + val.push(list_elem_18); } i_prot.read_list_end()?; f_4 = Some(val); @@ -5016,8 +5287,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_14 = KeyValue::read_from_in_protocol(i_prot)?; - val.push(list_elem_14); + let list_elem_19 = KeyValue::read_from_in_protocol(i_prot)?; + val.push(list_elem_19); } i_prot.read_list_end()?; f_5 = Some(val); @@ -5030,8 +5301,8 @@ impl crate::thrift::TSerializable for FileMetaData { let list_ident = i_prot.read_list_begin()?; let mut val: Vec = Vec::with_capacity(list_ident.size as usize); for _ in 0..list_ident.size { - let list_elem_15 = ColumnOrder::read_from_in_protocol(i_prot)?; - val.push(list_elem_15); + let list_elem_20 = ColumnOrder::read_from_in_protocol(i_prot)?; + val.push(list_elem_20); } i_prot.read_list_end()?; f_7 = Some(val);