From f3505c2af950423acee88c3097f91e9e66c5e1fe Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Sun, 20 May 2018 23:19:03 -0400 Subject: [PATCH] PARQUET-979: Limit size of min, max or disable stats for long binary types Author: Deepak Majeti Closes #465 from majetideepak/PARQUET-979 and squashes the following commits: 3b18173 [Deepak Majeti] improve naming and ColumnProperties class a888aa4 [Deepak Majeti] Add an option to specify max stats size c103c4f [Deepak Majeti] make format cf0260c [Deepak Majeti] PARQUET-979: [C++] Limit size of min, max or disable stats for long binary types --- .../parquet/arrow/arrow-reader-writer-test.cc | 2 +- cpp/src/parquet/column_writer-test.cc | 70 +++++++++++++-- cpp/src/parquet/column_writer.cc | 9 +- cpp/src/parquet/properties.h | 88 ++++++++++++++----- cpp/src/parquet/statistics.h | 4 + cpp/src/parquet/test-specialization.h | 8 ++ 6 files changed, 148 insertions(+), 33 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 47226a3c672cb..6d7e1ebb27cb3 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1445,8 +1445,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) { using ::arrow::Schema; using ::arrow::Table; using ::arrow::TimeUnit; - using ::arrow::TimestampType; using ::arrow::TimestampBuilder; + using ::arrow::TimestampType; using ::arrow::default_memory_pool; auto timestamp_type = std::make_shared(TimeUnit::NANO); diff --git a/cpp/src/parquet/column_writer-test.cc b/cpp/src/parquet/column_writer-test.cc index 7a5f379ba3c58..aac582a26ff23 100644 --- a/cpp/src/parquet/column_writer-test.cc +++ b/cpp/src/parquet/column_writer-test.cc @@ -71,19 +71,21 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { int64_t output_size = SMALL_SIZE, const ColumnProperties& column_properties = ColumnProperties()) { sink_.reset(new InMemoryOutputStream()); - metadata_ = ColumnChunkMetaDataBuilder::Make( - writer_properties_, this->descr_, reinterpret_cast(&thrift_metadata_)); - std::unique_ptr pager = - PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get()); WriterProperties::Builder wp_builder; - if (column_properties.encoding == Encoding::PLAIN_DICTIONARY || - column_properties.encoding == Encoding::RLE_DICTIONARY) { + if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY || + column_properties.encoding() == Encoding::RLE_DICTIONARY) { wp_builder.enable_dictionary(); } else { wp_builder.disable_dictionary(); - wp_builder.encoding(column_properties.encoding); + wp_builder.encoding(column_properties.encoding()); } + wp_builder.max_statistics_size(column_properties.max_statistics_size()); writer_properties_ = wp_builder.build(); + + metadata_ = ColumnChunkMetaDataBuilder::Make( + writer_properties_, this->descr_, reinterpret_cast(&thrift_metadata_)); + std::unique_ptr pager = + PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get()); std::shared_ptr writer = ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); return std::static_pointer_cast>(writer); @@ -173,6 +175,16 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { return metadata_accessor->num_values(); } + bool metadata_is_stats_set() { + // Metadata accessor must be created lazily. + // This is because the ColumnChunkMetaData semantics dictate the metadata object is + // complete (no changes to the metadata buffer can be made after instantiation) + ApplicationVersion app_version(this->writer_properties_->created_by()); + auto metadata_accessor = ColumnChunkMetaData::Make( + reinterpret_cast(&thrift_metadata_), this->descr_, &app_version); + return metadata_accessor->is_stats_set(); + } + std::vector metadata_encodings() { // Metadata accessor must be created lazily. // This is because the ColumnChunkMetaData semantics dictate the metadata object is @@ -520,6 +532,50 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { } } +// PARQUET-979 +// Prevent writing large stats +using TestByteArrayValuesWriter = TestPrimitiveWriter; +TEST_F(TestByteArrayValuesWriter, OmitStats) { + int min_len = 1024 * 4; + int max_len = 1024 * 8; + this->SetUpSchema(Repetition::REQUIRED); + auto writer = this->BuildWriter(); + + values_.resize(SMALL_SIZE); + InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); + writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); + writer->Close(); + + ASSERT_FALSE(this->metadata_is_stats_set()); +} + +TEST_F(TestByteArrayValuesWriter, LimitStats) { + int min_len = 1024 * 4; + int max_len = 1024 * 8; + this->SetUpSchema(Repetition::REQUIRED); + ColumnProperties column_properties; + column_properties.set_max_statistics_size(static_cast(max_len)); + auto writer = this->BuildWriter(SMALL_SIZE, column_properties); + + values_.resize(SMALL_SIZE); + InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len); + writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data()); + writer->Close(); + + ASSERT_TRUE(this->metadata_is_stats_set()); +} + +TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) { + this->SetUpSchema(Repetition::REQUIRED); + auto writer = this->BuildWriter(); + this->GenerateData(SMALL_SIZE); + + writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_); + writer->Close(); + + ASSERT_TRUE(this->metadata_is_stats_set()); +} + void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level, std::vector& input_levels) { // for each repetition count upto max_repeat_factor diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 4f2ef6c4699e3..8a1b56c8733d2 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -431,7 +431,14 @@ int64_t ColumnWriter::Close() { FlushBufferedDataPages(); EncodedStatistics chunk_statistics = GetChunkStatistics(); - if (chunk_statistics.is_set()) { + // From parquet-mr + // Don't write stats larger than the max size rather than truncating. The + // rationale is that some engines may use the minimum value in the page as + // the true minimum for aggregations and there is no way to mark that a + // value has been truncated and is a lower bound and not in the page. + if (chunk_statistics.is_set() && + chunk_statistics.max_stat_length() <= + properties_->max_statistics_size(descr_->path())) { metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(), chunk_statistics); } diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index a331aaebad8f7..83dc20574b9f0 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -84,6 +84,7 @@ static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE; static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024; static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024; static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true; +static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096; static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN; static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION = ParquetVersion::PARQUET_1_0; @@ -95,16 +96,46 @@ class PARQUET_EXPORT ColumnProperties { ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING, Compression::type codec = DEFAULT_COMPRESSION_TYPE, bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED, - bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED) - : encoding(encoding), - codec(codec), - dictionary_enabled(dictionary_enabled), - statistics_enabled(statistics_enabled) {} - - Encoding::type encoding; - Compression::type codec; - bool dictionary_enabled; - bool statistics_enabled; + bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED, + size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE) + : encoding_(encoding), + codec_(codec), + dictionary_enabled_(dictionary_enabled), + statistics_enabled_(statistics_enabled), + max_stats_size_(max_stats_size) {} + + void set_encoding(Encoding::type encoding) { encoding_ = encoding; } + + void set_compression(Compression::type codec) { codec_ = codec; } + + void set_dictionary_enabled(bool dictionary_enabled) { + dictionary_enabled_ = dictionary_enabled; + } + + void set_statistics_enabled(bool statistics_enabled) { + statistics_enabled_ = statistics_enabled; + } + + void set_max_statistics_size(size_t max_stats_size) { + max_stats_size_ = max_stats_size; + } + + Encoding::type encoding() const { return encoding_; } + + Compression::type compression() const { return codec_; } + + bool dictionary_enabled() const { return dictionary_enabled_; } + + bool statistics_enabled() const { return statistics_enabled_; } + + size_t max_statistics_size() const { return max_stats_size_; } + + private: + Encoding::type encoding_; + Compression::type codec_; + bool dictionary_enabled_; + bool statistics_enabled_; + size_t max_stats_size_; }; class PARQUET_EXPORT WriterProperties { @@ -127,12 +158,12 @@ class PARQUET_EXPORT WriterProperties { } Builder* enable_dictionary() { - default_column_properties_.dictionary_enabled = true; + default_column_properties_.set_dictionary_enabled(true); return this; } Builder* disable_dictionary() { - default_column_properties_.dictionary_enabled = false; + default_column_properties_.set_dictionary_enabled(false); return this; } @@ -196,7 +227,7 @@ class PARQUET_EXPORT WriterProperties { throw ParquetException("Can't use dictionary encoding as fallback encoding"); } - default_column_properties_.encoding = encoding_type; + default_column_properties_.set_encoding(encoding_type); return this; } @@ -228,7 +259,12 @@ class PARQUET_EXPORT WriterProperties { } Builder* compression(Compression::type codec) { - default_column_properties_.codec = codec; + default_column_properties_.set_compression(codec); + return this; + } + + Builder* max_statistics_size(size_t max_stats_sz) { + default_column_properties_.set_max_statistics_size(max_stats_sz); return this; } @@ -243,12 +279,12 @@ class PARQUET_EXPORT WriterProperties { } Builder* enable_statistics() { - default_column_properties_.statistics_enabled = true; + default_column_properties_.set_statistics_enabled(true); return this; } Builder* disable_statistics() { - default_column_properties_.statistics_enabled = false; + default_column_properties_.set_statistics_enabled(false); return this; } @@ -280,12 +316,12 @@ class PARQUET_EXPORT WriterProperties { return it->second; }; - for (const auto& item : encodings_) get(item.first).encoding = item.second; - for (const auto& item : codecs_) get(item.first).codec = item.second; + for (const auto& item : encodings_) get(item.first).set_encoding(item.second); + for (const auto& item : codecs_) get(item.first).set_compression(item.second); for (const auto& item : dictionary_enabled_) - get(item.first).dictionary_enabled = item.second; + get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) - get(item.first).statistics_enabled = item.second; + get(item.first).set_statistics_enabled(item.second); return std::shared_ptr( new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_, @@ -348,19 +384,23 @@ class PARQUET_EXPORT WriterProperties { } Encoding::type encoding(const std::shared_ptr& path) const { - return column_properties(path).encoding; + return column_properties(path).encoding(); } Compression::type compression(const std::shared_ptr& path) const { - return column_properties(path).codec; + return column_properties(path).compression(); } bool dictionary_enabled(const std::shared_ptr& path) const { - return column_properties(path).dictionary_enabled; + return column_properties(path).dictionary_enabled(); } bool statistics_enabled(const std::shared_ptr& path) const { - return column_properties(path).statistics_enabled; + return column_properties(path).statistics_enabled(); + } + + size_t max_statistics_size(const std::shared_ptr& path) const { + return column_properties(path).max_statistics_size(); } private: diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h index 36f9e4459b4e5..4f9df726e9c09 100644 --- a/cpp/src/parquet/statistics.h +++ b/cpp/src/parquet/statistics.h @@ -18,6 +18,7 @@ #ifndef PARQUET_COLUMN_STATISTICS_H #define PARQUET_COLUMN_STATISTICS_H +#include #include #include #include @@ -52,6 +53,9 @@ class PARQUET_EXPORT EncodedStatistics { return has_min || has_max || has_null_count || has_distinct_count; } + // larger of the max_ and min_ stat values + inline size_t max_stat_length() { return std::max(max_->length(), min_->length()); } + inline EncodedStatistics& set_max(const std::string& value) { *max_ = value; has_max = true; diff --git a/cpp/src/parquet/test-specialization.h b/cpp/src/parquet/test-specialization.h index a6112a23197c7..3d88cfc9e3fb2 100644 --- a/cpp/src/parquet/test-specialization.h +++ b/cpp/src/parquet/test-specialization.h @@ -50,6 +50,14 @@ void inline InitValues(int num_values, vector& values, random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len); } +void inline InitWideByteArrayValues(int num_values, vector& values, + vector& buffer, int min_len, int max_len) { + int num_bytes = static_cast(max_len + sizeof(uint32_t)); + size_t nbytes = num_values * num_bytes; + buffer.resize(nbytes); + random_byte_array(num_values, 0, buffer.data(), values.data(), min_len, max_len); +} + template <> void inline InitValues(int num_values, vector& values, vector& buffer) {