Skip to content

Commit

Permalink
Add ParquetMetadata::memory_size size estimation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jun 27, 2024
1 parent 0a4d8a1 commit 29abb9e
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 1 deletion.
18 changes: 18 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,13 @@ pub(crate) mod private {

/// Return the value as an mutable Any to allow for downcasts without transmutation
fn as_mut_any(&mut self) -> &mut dyn std::any::Any;

/// Returns the number of bytes of memory this instance uses on the heap.
///
/// Defaults to none (0)
fn heap_size(&self) -> usize {
0
}
}

impl ParquetValueType for bool {
Expand Down Expand Up @@ -968,6 +975,13 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn heap_size(&self) -> usize {
// note: this is an estimate, not exact, so just return the size
// of the actual data used, don't try to handle the fact that it may
// be shared.
self.data.as_ref().map(|data| data.len()).unwrap_or(0)
}
}

impl ParquetValueType for super::FixedLenByteArray {
Expand Down Expand Up @@ -1054,6 +1068,10 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

fn heap_size(&self) -> usize {
self.0.heap_size()
}
}
}

Expand Down
213 changes: 213 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Memory calculations for [`ParquetMetadata::memory_size`]
//!
//! [`ParquetMetadata::memory_size`]: crate::file::metadata::ParquetMetaData::memory_size
use crate::basic::{ColumnOrder, Compression, Encoding, PageType};
use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;

/// Trait for calculating the size of various containers
pub(crate) trait HeapSize {
/// Return the size of any bytes allocated on the heap by this object,
/// including heap memory in those structures
///
/// Note that the size of the type itself is not included in the result --
/// instead, that size is added by the caller (e.g. container).
fn heap_size(&self) -> usize;
}

impl<T: HeapSize> HeapSize for Vec<T> {
fn heap_size(&self) -> usize {
let item_size = std::mem::size_of::<T>();
// account for the contents of the Vec
(self.capacity() * item_size) +
// add any heap allocations by contents
self.iter().map(|t| t.heap_size()).sum::<usize>()
}
}

impl<T: HeapSize> HeapSize for Arc<T> {
fn heap_size(&self) -> usize {
self.as_ref().heap_size()
}
}

impl<T: HeapSize> HeapSize for Option<T> {
fn heap_size(&self) -> usize {
self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0)
}
}

impl HeapSize for String {
fn heap_size(&self) -> usize {
self.capacity()
}
}

impl HeapSize for FileMetaData {
fn heap_size(&self) -> usize {
self.created_by.heap_size()
+ self.key_value_metadata.heap_size()
+ self.schema_descr.heap_size()
+ self.column_orders.heap_size()
}
}

impl HeapSize for KeyValue {
fn heap_size(&self) -> usize {
self.key.heap_size() + self.value.heap_size()
}
}

impl HeapSize for RowGroupMetaData {
fn heap_size(&self) -> usize {
// don't count schema_descr here because it is already
// counted in FileMetaData
self.columns.heap_size() + self.sorting_columns.heap_size()
}
}

impl HeapSize for ColumnChunkMetaData {
fn heap_size(&self) -> usize {
// don't count column_descr here because it is already counted in
// FileMetaData
self.encodings.heap_size()
+ self.file_path.heap_size()
+ self.compression.heap_size()
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
}
}

impl HeapSize for Encoding {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageEncodingStats {
fn heap_size(&self) -> usize {
self.page_type.heap_size() + self.encoding.heap_size()
}
}

impl HeapSize for SortingColumn {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for Compression {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageType {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for Statistics {
fn heap_size(&self) -> usize {
match self {
Statistics::Boolean(value_statistics) => value_statistics.heap_size(),
Statistics::Int32(value_statistics) => value_statistics.heap_size(),
Statistics::Int64(value_statistics) => value_statistics.heap_size(),
Statistics::Int96(value_statistics) => value_statistics.heap_size(),
Statistics::Float(value_statistics) => value_statistics.heap_size(),
Statistics::Double(value_statistics) => value_statistics.heap_size(),
Statistics::ByteArray(value_statistics) => value_statistics.heap_size(),
Statistics::FixedLenByteArray(value_statistics) => value_statistics.heap_size(),
}
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Index::NONE => 0,
Index::BOOLEAN(native_index) => native_index.heap_size(),
Index::INT32(native_index) => native_index.heap_size(),
Index::INT64(native_index) => native_index.heap_size(),
Index::INT96(native_index) => native_index.heap_size(),
Index::FLOAT(native_index) => native_index.heap_size(),
Index::DOUBLE(native_index) => native_index.heap_size(),
Index::BYTE_ARRAY(native_index) => native_index.heap_size(),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index.heap_size(),
}
}
}

impl<T: ParquetValueType> HeapSize for NativeIndex<T> {
fn heap_size(&self) -> usize {
let Self {
indexes,
boundary_order,
} = self;
indexes.heap_size() + boundary_order.heap_size()
}
}

impl<T: ParquetValueType> HeapSize for PageIndex<T> {
fn heap_size(&self) -> usize {
self.min.heap_size() + self.max.heap_size() + self.null_count.heap_size()
}
}

impl<T: ParquetValueType> HeapSize for ValueStatistics<T> {
fn heap_size(&self) -> usize {
self.min().heap_size() + self.max().heap_size()
}
}

// Note this impl gets most primitive types like bool, i32, etc
impl<T: ParquetValueType> HeapSize for T {
fn heap_size(&self) -> usize {
self.heap_size()
}
}

impl HeapSize for usize {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for BoundaryOrder {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageLocation {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for ColumnOrder {
fn heap_size(&self) -> usize {
0 // no heap allocations in ColumnOrder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
//! * [`ColumnChunkMetaData`]: Metadata for each column chunk (primitive leaf)
//! within a Row Group including encoding and compression information,
//! number of values, statistics, etc.
mod memory;

use std::ops::Range;
use std::sync::Arc;

Expand All @@ -39,6 +41,7 @@ use crate::format::{

use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
use crate::file::statistics::{self, Statistics};
Expand Down Expand Up @@ -176,6 +179,28 @@ impl ParquetMetaData {
self.offset_index.as_ref()
}

/// Estimate of the bytes allocated to store `ParquetMetadata`
///
/// # Notes:
///
/// 1. Includes size of self
///
/// 2. Includes heap memory for sub fields such as [`FileMetaData`] and
/// [`RowGroupMetaData`].
///
/// 2. Includes memory from shared pointers (e.g. [`SchemaDescPtr`]). This
/// means `memory_size` will over estimate the memory size if such pointers
/// are shared.
///
/// 3. Does not include any allocator overheads
pub fn memory_size(&self) -> usize {
std::mem::size_of::<Self>()
+ self.file_metadata.heap_size()
+ self.row_groups.heap_size()
+ self.column_index.heap_size()
+ self.offset_index.heap_size()
}

/// Override the column index
#[cfg(feature = "arrow")]
pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
Expand Down Expand Up @@ -1034,7 +1059,8 @@ impl OffsetIndexBuilder {
#[cfg(test)]
mod tests {
use super::*;
use crate::basic::PageType;
use crate::basic::{PageType, SortOrder};
use crate::file::page_index::index::NativeIndex;

#[test]
fn test_row_group_metadata_thrift_conversion() {
Expand Down Expand Up @@ -1227,6 +1253,68 @@ mod tests {
assert_eq!(compressed_size_res, compressed_size_exp);
}

#[test]
fn test_memory_size() {
let schema_descr = get_test_schema_descr();

let columns = schema_descr
.columns()
.iter()
.map(|column_descr| ColumnChunkMetaData::builder(column_descr.clone()).build())
.collect::<Result<Vec<_>>>()
.unwrap();
let row_group_meta = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(1000)
.set_column_metadata(columns)
.build()
.unwrap();
let row_group_meta = vec![row_group_meta];

let version = 2;
let num_rows = 1000;
let created_by = Some(String::from("test harness"));
let key_value_metadata = Some(vec![KeyValue::new(
String::from("Foo"),
Some(String::from("bar")),
)]);
let column_orders = Some(vec![
ColumnOrder::UNDEFINED,
ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::UNSIGNED),
]);
let file_metadata = FileMetaData::new(
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);
let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta.clone());
let base_expected_size = 12345;
assert_eq!(parquet_meta.memory_size(), base_expected_size);

let mut column_index = ColumnIndexBuilder::new();
column_index.append(false, vec![1u8], vec![2u8, 3u8], 4);
let column_index = column_index.build_to_thrift();
let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();

// Now, add in OffsetIndex
let parquet_meta = ParquetMetaData::new_with_page_index(
file_metadata,
row_group_meta,
Some(vec![vec![Index::BOOLEAN(native_index)]]),
Some(vec![vec![
vec![PageLocation::new(1, 2, 3)],
vec![PageLocation::new(1, 2, 3)],
]]),
);

let bigger_expected_size = 12345;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
}

/// Returns sample schema descriptor so we can create column metadata.
fn get_test_schema_descr() -> SchemaDescPtr {
let schema = SchemaType::group_type_builder("schema")
Expand Down
Loading

0 comments on commit 29abb9e

Please sign in to comment.