Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable truncation of binary statistics columns #5076

Merged
merged 7 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 211 additions & 17 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder.append(
null_page,
self.truncate_min_value(stat.min_bytes()),
self.truncate_max_value(stat.max_bytes()),
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
}
Expand All @@ -658,26 +666,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}

fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
self.props
.column_index_truncate_length()
fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
truncation_length
.filter(|l| data.len() > *l)
.and_then(|l| match str::from_utf8(data) {
Ok(str_data) => truncate_utf8(str_data, l),
Err(_) => Some(data[..l].to_vec()),
})
.unwrap_or_else(|| data.to_vec())
.map(|truncated| (truncated, true))
.unwrap_or_else(|| (data.to_vec(), false))
}

fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
self.props
.column_index_truncate_length()
fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
truncation_length
.filter(|l| data.len() > *l)
.and_then(|l| match str::from_utf8(data) {
Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
Err(_) => increment(data[..l].to_vec()),
})
.unwrap_or_else(|| data.to_vec())
.map(|truncated| (truncated, true))
.unwrap_or_else(|| (data.to_vec(), false))
}

/// Adds data page.
Expand Down Expand Up @@ -856,20 +864,64 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.set_dictionary_page_offset(dict_page_offset);

if self.statistics_enabled != EnabledStatistics::None {
let backwards_compatible_min_max = self.descr.sort_order().is_signed();

let statistics = ValueStatistics::<E::T>::new(
self.column_metrics.min_column_value.clone(),
self.column_metrics.max_column_value.clone(),
self.column_metrics.column_distinct_count,
self.column_metrics.num_column_nulls,
false,
);
)
.with_backwards_compatible_min_max(backwards_compatible_min_max)
.into();

let statistics = match statistics {
Statistics::ByteArray(stats) if stats.has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes(),
);
Statistics::ByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes(),
);
Statistics::FixedLenByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
stats => stats,
};

// Some common readers only support the deprecated statistics
// format so we also write them out if possible
// See https://github.com/apache/arrow-rs/issues/799
let statistics = statistics
.with_backwards_compatible_min_max(self.descr.sort_order().is_signed())
.into();
builder = builder.set_statistics(statistics);
}

Expand Down Expand Up @@ -2612,6 +2664,148 @@ mod tests {
}
}

#[test]
fn test_statistics_truncating_byte_array() {
let page_writer = get_test_page_writer();

const TEST_TRUNCATE_LENGTH: usize = 1;

// Truncate values at 1 byte
let builder =
WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
let props = Arc::new(builder.build());
let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![ByteArray::default(); 1];
// This is the expected min value
data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

assert_eq!(1, r.rows_written);

let stats = r.metadata.statistics().expect("statistics");
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::ByteArray(_stats) = stats {
let min_value = _stats.min();
let max_value = _stats.max();

assert!(!_stats.min_is_exact());
assert!(!_stats.max_is_exact());

assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);

assert_eq!("B".as_bytes(), min_value.as_bytes());
assert_eq!("C".as_bytes(), max_value.as_bytes());
} else {
panic!("expecting Statistics::ByteArray");
}
}

#[test]
fn test_statistics_truncating_fixed_len_byte_array() {
let page_writer = get_test_page_writer();

const TEST_TRUNCATE_LENGTH: usize = 1;

// Truncate values at 1 byte
let builder =
WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
let props = Arc::new(builder.build());
let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![FixedLenByteArray::default(); 1];

const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();

const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
[PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];

// This is the expected min value
data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

assert_eq!(1, r.rows_written);

let stats = r.metadata.statistics().expect("statistics");
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
let min_value = _stats.min();
let max_value = _stats.max();

assert!(!_stats.min_is_exact());
assert!(!_stats.max_is_exact());

assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);

assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());

let reconstructed_min = i128::from_be_bytes([
min_value.as_bytes()[0],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]);

let reconstructed_max = i128::from_be_bytes([
max_value.as_bytes()[0],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]);

// check that the inner value is correctly bounded by the min/max
println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
}

#[test]
fn test_send() {
fn test<T: Send>() {}
Expand Down
24 changes: 24 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
/// Default value for [`BloomFilterProperties::ndv`]
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
/// Default values for [`WriterProperties::statistics_truncate_length`]
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = None;

/// Parquet writer version.
///
Expand Down Expand Up @@ -136,6 +138,7 @@ pub struct WriterProperties {
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
}

impl Default for WriterProperties {
Expand Down Expand Up @@ -241,6 +244,13 @@ impl WriterProperties {
self.column_index_truncate_length
}

/// Returns the maximum length of truncated min/max values in statistics.
///
/// `None` if truncation is disabled, must be greater than 0 otherwise.
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}

/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
Expand Down Expand Up @@ -334,6 +344,7 @@ pub struct WriterPropertiesBuilder {
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
}

impl WriterPropertiesBuilder {
Expand All @@ -352,6 +363,7 @@ impl WriterPropertiesBuilder {
column_properties: HashMap::new(),
sorting_columns: None,
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
}
}

Expand All @@ -370,6 +382,7 @@ impl WriterPropertiesBuilder {
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
}
}

Expand Down Expand Up @@ -643,6 +656,17 @@ impl WriterPropertiesBuilder {
self.column_index_truncate_length = max_length;
self
}

/// Sets the max length of min/max value fields in statistics. Must be greater than 0.
/// If set to `None` - there's no effective limit.
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(value > 0, "Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`.");
}

self.statistics_truncate_length = max_length;
self
}
}

/// Controls the level of statistics to be computed by the writer
Expand Down
Loading
Loading