Skip to content

Commit

Permalink
PARQUET-979: Limit size of min, max or disable stats for long binary …
Browse files Browse the repository at this point in the history
…types

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes apache#465 from majetideepak/PARQUET-979 and squashes the following commits:

3b18173 [Deepak Majeti] improve naming and ColumnProperties class
a888aa4 [Deepak Majeti] Add an option to specify max stats size
c103c4f [Deepak Majeti] make format
cf0260c [Deepak Majeti] PARQUET-979: [C++] Limit size of min, max or disable stats for long binary types

Change-Id: I2843608b1848b6d85ac226064dd4c5ec93e40f47
  • Loading branch information
Deepak Majeti committed May 21, 2018
1 parent b9e80c8 commit 076fbc6
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1445,8 +1445,8 @@ TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
using ::arrow::Schema;
using ::arrow::Table;
using ::arrow::TimeUnit;
using ::arrow::TimestampType;
using ::arrow::TimestampBuilder;
using ::arrow::TimestampType;
using ::arrow::default_memory_pool;

auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
Expand Down
70 changes: 63 additions & 7 deletions cpp/src/parquet/column_writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,21 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
int64_t output_size = SMALL_SIZE,
const ColumnProperties& column_properties = ColumnProperties()) {
sink_.reset(new InMemoryOutputStream());
metadata_ = ColumnChunkMetaDataBuilder::Make(
writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get());
WriterProperties::Builder wp_builder;
if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
column_properties.encoding == Encoding::RLE_DICTIONARY) {
if (column_properties.encoding() == Encoding::PLAIN_DICTIONARY ||
column_properties.encoding() == Encoding::RLE_DICTIONARY) {
wp_builder.enable_dictionary();
} else {
wp_builder.disable_dictionary();
wp_builder.encoding(column_properties.encoding);
wp_builder.encoding(column_properties.encoding());
}
wp_builder.max_statistics_size(column_properties.max_statistics_size());
writer_properties_ = wp_builder.build();

metadata_ = ColumnChunkMetaDataBuilder::Make(
writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get());
std::shared_ptr<ColumnWriter> writer =
ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
Expand Down Expand Up @@ -173,6 +175,16 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
return metadata_accessor->num_values();
}

bool metadata_is_stats_set() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
ApplicationVersion app_version(this->writer_properties_->created_by());
auto metadata_accessor = ColumnChunkMetaData::Make(
reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_, &app_version);
return metadata_accessor->is_stats_set();
}

std::vector<Encoding::type> metadata_encodings() {
// Metadata accessor must be created lazily.
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
Expand Down Expand Up @@ -520,6 +532,50 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
}
}

// PARQUET-979
// Prevent writing large stats
using TestByteArrayValuesWriter = TestPrimitiveWriter<ByteArrayType>;
TEST_F(TestByteArrayValuesWriter, OmitStats) {
int min_len = 1024 * 4;
int max_len = 1024 * 8;
this->SetUpSchema(Repetition::REQUIRED);
auto writer = this->BuildWriter();

values_.resize(SMALL_SIZE);
InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
writer->Close();

ASSERT_FALSE(this->metadata_is_stats_set());
}

TEST_F(TestByteArrayValuesWriter, LimitStats) {
int min_len = 1024 * 4;
int max_len = 1024 * 8;
this->SetUpSchema(Repetition::REQUIRED);
ColumnProperties column_properties;
column_properties.set_max_statistics_size(static_cast<size_t>(max_len));
auto writer = this->BuildWriter(SMALL_SIZE, column_properties);

values_.resize(SMALL_SIZE);
InitWideByteArrayValues(SMALL_SIZE, this->values_, this->buffer_, min_len, max_len);
writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_.data());
writer->Close();

ASSERT_TRUE(this->metadata_is_stats_set());
}

TEST_F(TestByteArrayValuesWriter, CheckDefaultStats) {
this->SetUpSchema(Repetition::REQUIRED);
auto writer = this->BuildWriter();
this->GenerateData(SMALL_SIZE);

writer->WriteBatch(SMALL_SIZE, nullptr, nullptr, this->values_ptr_);
writer->Close();

ASSERT_TRUE(this->metadata_is_stats_set());
}

void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
std::vector<int16_t>& input_levels) {
// for each repetition count upto max_repeat_factor
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,14 @@ int64_t ColumnWriter::Close() {
FlushBufferedDataPages();

EncodedStatistics chunk_statistics = GetChunkStatistics();
if (chunk_statistics.is_set()) {
// From parquet-mr
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
// the true minimum for aggregations and there is no way to mark that a
// value has been truncated and is a lower bound and not in the page.
if (chunk_statistics.is_set() &&
chunk_statistics.max_stat_length() <=
properties_->max_statistics_size(descr_->path())) {
metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
chunk_statistics);
}
Expand Down
88 changes: 64 additions & 24 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 64 * 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;
Expand All @@ -95,16 +96,46 @@ class PARQUET_EXPORT ColumnProperties {
ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
Compression::type codec = DEFAULT_COMPRESSION_TYPE,
bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
: encoding(encoding),
codec(codec),
dictionary_enabled(dictionary_enabled),
statistics_enabled(statistics_enabled) {}

Encoding::type encoding;
Compression::type codec;
bool dictionary_enabled;
bool statistics_enabled;
bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
: encoding_(encoding),
codec_(codec),
dictionary_enabled_(dictionary_enabled),
statistics_enabled_(statistics_enabled),
max_stats_size_(max_stats_size) {}

void set_encoding(Encoding::type encoding) { encoding_ = encoding; }

void set_compression(Compression::type codec) { codec_ = codec; }

void set_dictionary_enabled(bool dictionary_enabled) {
dictionary_enabled_ = dictionary_enabled;
}

void set_statistics_enabled(bool statistics_enabled) {
statistics_enabled_ = statistics_enabled;
}

void set_max_statistics_size(size_t max_stats_size) {
max_stats_size_ = max_stats_size;
}

Encoding::type encoding() const { return encoding_; }

Compression::type compression() const { return codec_; }

bool dictionary_enabled() const { return dictionary_enabled_; }

bool statistics_enabled() const { return statistics_enabled_; }

size_t max_statistics_size() const { return max_stats_size_; }

private:
Encoding::type encoding_;
Compression::type codec_;
bool dictionary_enabled_;
bool statistics_enabled_;
size_t max_stats_size_;
};

class PARQUET_EXPORT WriterProperties {
Expand All @@ -127,12 +158,12 @@ class PARQUET_EXPORT WriterProperties {
}

Builder* enable_dictionary() {
default_column_properties_.dictionary_enabled = true;
default_column_properties_.set_dictionary_enabled(true);
return this;
}

Builder* disable_dictionary() {
default_column_properties_.dictionary_enabled = false;
default_column_properties_.set_dictionary_enabled(false);
return this;
}

Expand Down Expand Up @@ -196,7 +227,7 @@ class PARQUET_EXPORT WriterProperties {
throw ParquetException("Can't use dictionary encoding as fallback encoding");
}

default_column_properties_.encoding = encoding_type;
default_column_properties_.set_encoding(encoding_type);
return this;
}

Expand Down Expand Up @@ -228,7 +259,12 @@ class PARQUET_EXPORT WriterProperties {
}

Builder* compression(Compression::type codec) {
default_column_properties_.codec = codec;
default_column_properties_.set_compression(codec);
return this;
}

Builder* max_statistics_size(size_t max_stats_sz) {
default_column_properties_.set_max_statistics_size(max_stats_sz);
return this;
}

Expand All @@ -243,12 +279,12 @@ class PARQUET_EXPORT WriterProperties {
}

Builder* enable_statistics() {
default_column_properties_.statistics_enabled = true;
default_column_properties_.set_statistics_enabled(true);
return this;
}

Builder* disable_statistics() {
default_column_properties_.statistics_enabled = false;
default_column_properties_.set_statistics_enabled(false);
return this;
}

Expand Down Expand Up @@ -280,12 +316,12 @@ class PARQUET_EXPORT WriterProperties {
return it->second;
};

for (const auto& item : encodings_) get(item.first).encoding = item.second;
for (const auto& item : codecs_) get(item.first).codec = item.second;
for (const auto& item : encodings_) get(item.first).set_encoding(item.second);
for (const auto& item : codecs_) get(item.first).set_compression(item.second);
for (const auto& item : dictionary_enabled_)
get(item.first).dictionary_enabled = item.second;
get(item.first).set_dictionary_enabled(item.second);
for (const auto& item : statistics_enabled_)
get(item.first).statistics_enabled = item.second;
get(item.first).set_statistics_enabled(item.second);

return std::shared_ptr<WriterProperties>(
new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
Expand Down Expand Up @@ -348,19 +384,23 @@ class PARQUET_EXPORT WriterProperties {
}

Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).encoding;
return column_properties(path).encoding();
}

Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).codec;
return column_properties(path).compression();
}

bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).dictionary_enabled;
return column_properties(path).dictionary_enabled();
}

bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).statistics_enabled;
return column_properties(path).statistics_enabled();
}

size_t max_statistics_size(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).max_statistics_size();
}

private:
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef PARQUET_COLUMN_STATISTICS_H
#define PARQUET_COLUMN_STATISTICS_H

#include <algorithm>
#include <cstdint>
#include <memory>
#include <string>
Expand Down Expand Up @@ -52,6 +53,9 @@ class PARQUET_EXPORT EncodedStatistics {
return has_min || has_max || has_null_count || has_distinct_count;
}

// larger of the max_ and min_ stat values
inline size_t max_stat_length() { return std::max(max_->length(), min_->length()); }

inline EncodedStatistics& set_max(const std::string& value) {
*max_ = value;
has_max = true;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/test-specialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ void inline InitValues<ByteArray>(int num_values, vector<ByteArray>& values,
random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
}

void inline InitWideByteArrayValues(int num_values, vector<ByteArray>& values,
vector<uint8_t>& buffer, int min_len, int max_len) {
int num_bytes = static_cast<int>(max_len + sizeof(uint32_t));
size_t nbytes = num_values * num_bytes;
buffer.resize(nbytes);
random_byte_array(num_values, 0, buffer.data(), values.data(), min_len, max_len);
}

template <>
void inline InitValues<FLBA>(int num_values, vector<FLBA>& values,
vector<uint8_t>& buffer) {
Expand Down

0 comments on commit 076fbc6

Please sign in to comment.