Skip to content

Commit

Permalink
Support reading plain encoded INT96 timestamp from Parquet file (#10850)
Browse files Browse the repository at this point in the history
Summary:
Follow-up for: #4680.

Pull Request resolved: #10850

Reviewed By: pedroerp

Differential Revision: D65686495

Pulled By: kevinwilfong

fbshipit-source-id: dc26cb4aeda01c0219b63ea86567402b7f361a90
  • Loading branch information
rui-mo authored and facebook-github-bot committed Nov 12, 2024
1 parent 3884939 commit da39954
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 120 deletions.
17 changes: 17 additions & 0 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class IntDecoder {
template <typename T>
T readInt();

// Reads Int96 timestamp composed of days and nanos as int128_t.
int128_t readInt96();

template <typename T>
T readVInt();

Expand Down Expand Up @@ -438,12 +441,26 @@ inline T IntDecoder<isSigned>::readInt() {
return readLittleEndianFromBigEndian<T>();
} else {
if constexpr (std::is_same_v<T, int128_t>) {
if (numBytes_ == 12) {
VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded.");
return readInt96();
}
VELOX_NYI();
}
return readLongLE();
}
}

template <bool isSigned>
inline int128_t IntDecoder<isSigned>::readInt96() {
int128_t result = 0;
for (int i = 0; i < 12; ++i) {
auto ch = readByte();
result |= static_cast<uint128_t>(ch & BASE_256_MASK) << (i * 8);
}
return result;
}

template <bool isSigned>
template <typename T>
inline T IntDecoder<isSigned>::readVInt() {
Expand Down
69 changes: 43 additions & 26 deletions velox/dwio/common/SelectiveIntegerColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,26 @@ void SelectiveIntegerColumnReader::processFilter(
ExtractValues extractValues,
const RowSet& rows) {
if (filter == nullptr) {
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(), rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::AlwaysTrue, isDense>(
&dwio::common::alwaysTrue(), rows, extractValues);
return;
}

switch (filter->kind()) {
case velox::common::FilterKind::kAlwaysTrue:
readHelper<Reader, velox::common::AlwaysTrue, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::AlwaysTrue, isDense>(
filter, rows, extractValues);
break;
case velox::common::FilterKind::kIsNull:
if constexpr (kEncodingHasNulls) {
filterNulls<int64_t>(
rows, true, !std::is_same_v<decltype(extractValues), DropValues>);
} else {
readHelper<Reader, velox::common::IsNull, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::IsNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kIsNotNull:
Expand All @@ -152,41 +155,55 @@ void SelectiveIntegerColumnReader::processFilter(
std::is_same_v<decltype(extractValues), DropValues>) {
filterNulls<int64_t>(rows, false, false);
} else {
readHelper<Reader, velox::common::IsNotNull, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::IsNotNull, isDense>(
filter, rows, extractValues);
}
break;
case velox::common::FilterKind::kBigintRange:
readHelper<Reader, velox::common::BigintRange, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::BigintRange, isDense>(
filter, rows, extractValues);
break;
case velox::common::FilterKind::kNegatedBigintRange:
readHelper<Reader, velox::common::NegatedBigintRange, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<
Reader,
velox::common::NegatedBigintRange,
isDense>(filter, rows, extractValues);
break;
case velox::common::FilterKind::kBigintValuesUsingHashTable:
readHelper<Reader, velox::common::BigintValuesUsingHashTable, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<
Reader,
velox::common::BigintValuesUsingHashTable,
isDense>(filter, rows, extractValues);
break;
case velox::common::FilterKind::kBigintValuesUsingBitmask:
readHelper<Reader, velox::common::BigintValuesUsingBitmask, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<
Reader,
velox::common::BigintValuesUsingBitmask,
isDense>(filter, rows, extractValues);
break;
case velox::common::FilterKind::kNegatedBigintValuesUsingHashTable:
readHelper<
Reader,
velox::common::NegatedBigintValuesUsingHashTable,
isDense>(filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<
Reader,
velox::common::NegatedBigintValuesUsingHashTable,
isDense>(filter, rows, extractValues);
break;
case velox::common::FilterKind::kNegatedBigintValuesUsingBitmask:
readHelper<
Reader,
velox::common::NegatedBigintValuesUsingBitmask,
isDense>(filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<
Reader,
velox::common::NegatedBigintValuesUsingBitmask,
isDense>(filter, rows, extractValues);
break;
default:
readHelper<Reader, velox::common::Filter, isDense>(
filter, rows, extractValues);
static_cast<Reader*>(this)
->template readHelper<Reader, velox::common::Filter, isDense>(
filter, rows, extractValues);
break;
}
}
Expand Down
19 changes: 6 additions & 13 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
Expand All @@ -392,23 +392,16 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto values = dictionary_.values->asMutable<int128_t>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
int64_t nanos;
int128_t result = 0;
memcpy(
&nanos,
&result,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(int64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t),
sizeof(int32_t));
values[i] = Timestamp::fromDaysAndNanos(days, nanos);
sizeof(Int96Timestamp));
values[i] = result;
}
break;
}
Expand Down
72 changes: 68 additions & 4 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"

namespace facebook::velox::parquet {
namespace {

// Range filter for Parquet Int96 Timestamp.
class ParquetInt96TimestampRange : public common::TimestampRange {
public:
// @param lower Lower end of the range, inclusive.
// @param upper Upper end of the range, inclusive.
// @param nullAllowed Null values are passing the filter if true.
ParquetInt96TimestampRange(
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed)
: TimestampRange(lower, upper, nullAllowed) {}

// Int96 is read as int128_t value and converted to Timestamp by extracting
// days and nanos.
bool testInt128(int128_t value) const final override {
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= this->lower() && ts <= this->upper();
}
};

} // namespace

class TimestampColumnReader : public IntegerColumnReader {
public:
Expand Down Expand Up @@ -49,8 +74,14 @@ class TimestampColumnReader : public IntegerColumnReader {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();

// Convert int128_t to Timestamp by extracting days and nanos.
const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
const int32_t days = static_cast<int32_t>(encoded >> 64);
uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);

nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
Expand All @@ -65,14 +96,47 @@ class TimestampColumnReader : public IntegerColumnReader {
}
}

template <
typename Reader,
typename TFilter,
bool isDense,
typename ExtractValues>
void readHelper(
velox::common::Filter* filter,
const RowSet& rows,
ExtractValues extractValues) {
if (auto* range = dynamic_cast<common::TimestampRange*>(filter)) {
// Convert TimestampRange to ParquetInt96TimestampRange.
ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange(
range->lower(), range->upper(), range->nullAllowed());
this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
} else {
this->readWithVisitor(
rows,
dwio::common::
ColumnVisitor<int128_t, TFilter, ExtractValues, isDense>(
*reinterpret_cast<TFilter*>(filter),
this,
rows,
extractValues));
}
return;
}

void read(
int64_t offset,
const RowSet& rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
// Use int128_t as a workaround. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readCommon<TimestampColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
}

Expand Down
Binary file not shown.
14 changes: 14 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, timestampDictionary) {
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;
Expand Down
Loading

0 comments on commit da39954

Please sign in to comment.