Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reading Iceberg split with equality deletes #10898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ bool applyPartitionFilter(
VELOX_FAIL(
"Bad type {} for partition value: {}", type->kind(), partitionValue);
}
return true;
}

} // namespace
Expand All @@ -655,7 +656,8 @@ bool testFilters(
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
const auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
// const auto& fileTypeWithId = reader->typeWithId();
const auto& fileType = reader->rowType();
const auto& rowType = reader->rowType();
for (const auto& child : scanSpec->children()) {
if (child->filter()) {
Expand Down Expand Up @@ -685,14 +687,15 @@ bool testFilters(
return false;
}
} else {
const auto& typeWithId = fileTypeWithId->childByName(name);
const auto columnStats = reader->columnStatistics(typeWithId->id());
const auto& type = rowType->findChild(name);
const auto columnStats =
reader->columnStatistics(rowType->getChildIdx(name));
VLOG(0) << "testFilters column name: " << name
<< " type: " << type->kind() << " stats: " << columnStats.get()
<< (columnStats ? columnStats->toString() : "no stats");
if (columnStats != nullptr &&
!testFilter(
child->filter(),
columnStats.get(),
totalRows.value(),
typeWithId->type())) {
child->filter(), columnStats.get(), totalRows.value(), type)) {
VLOG(1) << "Skipping " << filePath
<< " based on stats and filter for column "
<< child->fieldName();
Expand Down
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ std::unique_ptr<SplitReader> HiveDataSource::createSplitReader() {
ioStats_,
fileHandleFactory_,
executor_,
scanSpec_);
scanSpec_,
remainingFilterExprSet_,
expressionEvaluator_,
totalRemainingFilterTime_);
}

std::unique_ptr<HivePartitionFunction> HiveDataSource::setupBucketConversion() {
Expand Down Expand Up @@ -283,8 +286,10 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
}

splitReader_ = createSplitReader();

// Split reader subclasses may need to use the reader options in prepareSplit
// so we initialize it beforehand.

splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_, rowIndexColumn_);
}
Expand Down Expand Up @@ -551,6 +556,7 @@ std::shared_ptr<wave::WaveDataSource> HiveDataSource::toWaveDataSource() {
void HiveDataSource::registerWaveDelegateHook(WaveDelegateHookFunction hook) {
waveDelegateHook_ = hook;
}

std::shared_ptr<wave::WaveDataSource> toWaveDataSource();

} // namespace facebook::velox::connector::hive
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class HiveDataSource : public DataSource {
subfields_;
SubfieldFilters filters_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
std::shared_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
Expand Down
15 changes: 13 additions & 2 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec) {
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime) {
// Create the SplitReader based on hiveSplit->customSplitInfo["table_format"]
if (hiveSplit->customSplitInfo.count("table_format") > 0 &&
hiveSplit->customSplitInfo["table_format"] == "hive-iceberg") {
Expand All @@ -88,7 +91,10 @@ std::unique_ptr<SplitReader> SplitReader::create(
ioStats,
fileHandleFactory,
executor,
scanSpec);
scanSpec,
remainingFilterExprSet,
expressionEvaluator,
totalRemainingFilterTime);
} else {
return std::unique_ptr<SplitReader>(new SplitReader(
hiveSplit,
Expand Down Expand Up @@ -179,6 +185,11 @@ void SplitReader::resetSplit() {
hiveSplit_.reset();
}

std::shared_ptr<const dwio::common::TypeWithId> SplitReader::baseFileSchema() {
VELOX_CHECK_NOT_NULL(baseReader_.get());
return baseReader_->typeWithId();
}

int64_t SplitReader::estimatedRowSize() const {
if (!baseRowReader_) {
return DataSource::kUnknownRowSize;
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class SplitReader {
const std::shared_ptr<io::IoStatistics>& ioStats,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const std::shared_ptr<common::ScanSpec>& scanSpec);
const std::shared_ptr<common::ScanSpec>& scanSpec,
std::shared_ptr<exec::ExprSet>& remainingFilterExprSet,
core::ExpressionEvaluator* expressionEvaluator,
std::atomic<uint64_t>& totalRemainingFilterTime);

virtual ~SplitReader() = default;

Expand All @@ -89,6 +92,8 @@ class SplitReader {

void resetSplit();

std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema();

int64_t estimatedRowSize() const;

void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const;
Expand Down
9 changes: 7 additions & 2 deletions velox/connectors/hive/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

velox_add_library(velox_hive_iceberg_splitreader IcebergSplitReader.cpp
IcebergSplit.cpp PositionalDeleteFileReader.cpp)
velox_add_library(
velox_hive_iceberg_splitreader
IcebergSplitReader.cpp
IcebergSplit.cpp
PositionalDeleteFileReader.cpp
EqualityDeleteFileReader.cpp
FilterUtil.cpp)

velox_link_libraries(velox_hive_iceberg_splitreader velox_connector
Folly::folly)
Expand Down
227 changes: 227 additions & 0 deletions velox/connectors/hive/iceberg/EqualityDeleteFileReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/iceberg/EqualityDeleteFileReader.h"

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/iceberg/FilterUtil.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/dwio/common/ReaderFactory.h"
#include "velox/dwio/common/TypeUtils.h"

using namespace facebook::velox::common;
using namespace facebook::velox::core;
using namespace facebook::velox::exec;

namespace facebook::velox::connector::hive::iceberg {

static constexpr const int kMaxBatchRows = 10'000;

EqualityDeleteFileReader::EqualityDeleteFileReader(
const IcebergDeleteFile& deleteFile,
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema,
FileHandleFactory* fileHandleFactory,
folly::Executor* executor,
const ConnectorQueryCtx* connectorQueryCtx,
const std::shared_ptr<const HiveConfig> hiveConfig,
std::shared_ptr<io::IoStatistics> ioStats,
dwio::common::RuntimeStatistics& runtimeStats,
const std::string& connectorId)
: deleteFile_(deleteFile),
baseFileSchema_(baseFileSchema),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
deleteSplit_(nullptr),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
pool_(connectorQueryCtx->memoryPool()),
ioStats_(ioStats),
deleteRowReader_(nullptr) {
VELOX_CHECK(deleteFile_.content == FileContent::kEqualityDeletes);

if (deleteFile_.recordCount == 0) {
return;
}

std::unordered_set<int32_t> equalityFieldIds(
deleteFile_.equalityFieldIds.begin(), deleteFile_.equalityFieldIds.end());
auto deleteFieldSelector = [&equalityFieldIds](size_t index) {
return equalityFieldIds.find(static_cast<int32_t>(index)) !=
equalityFieldIds.end();
};
auto deleteFileSchema = dwio::common::typeutils::buildSelectedType(
baseFileSchema_, deleteFieldSelector);

rowType_ = std::static_pointer_cast<const RowType>(deleteFileSchema->type());

// TODO: push down filter if previous delete file contains this one. E.g.
// previous equality delete file has a=1, and this file also contains
// columns a, then a!=1 can be pushed as a filter when reading this delete
// file.

auto scanSpec = std::make_shared<common::ScanSpec>("<root>");
scanSpec->addAllChildFields(rowType_->asRow());

deleteSplit_ = std::make_shared<HiveConnectorSplit>(
connectorId,
deleteFile_.filePath,
deleteFile_.fileFormat,
0,
deleteFile_.fileSizeInBytes);

// Create the Reader and RowReader

dwio::common::ReaderOptions deleteReaderOpts(pool_);
configureReaderOptions(
deleteReaderOpts,
hiveConfig_,
connectorQueryCtx_,
rowType_,
deleteSplit_);

auto deleteFileHandleCachePtr =
fileHandleFactory_->generate(deleteFile_.filePath);
auto deleteFileInput = createBufferedInput(
*deleteFileHandleCachePtr,
deleteReaderOpts,
connectorQueryCtx_,
ioStats_,
executor_);

auto deleteReader =
dwio::common::getReaderFactory(deleteReaderOpts.fileFormat())
->createReader(std::move(deleteFileInput), deleteReaderOpts);

dwio::common::RowReaderOptions deleteRowReaderOpts;
configureRowReaderOptions(
{},
scanSpec,
nullptr,
rowType_,
deleteSplit_,
hiveConfig_,
connectorQueryCtx_->sessionProperties(),
deleteRowReaderOpts);

deleteRowReader_.reset();
deleteRowReader_ = deleteReader->createRowReader(deleteRowReaderOpts);
}

void EqualityDeleteFileReader::readDeleteValues(
SubfieldFilters& subfieldFilters,
std::vector<core::TypedExprPtr>& expressionInputs) {
VELOX_CHECK(deleteRowReader_);
VELOX_CHECK(deleteSplit_);

if (!deleteValuesOutput_) {
deleteValuesOutput_ = BaseVector::create(rowType_, 0, pool_);
}

// TODO:: verfiy if the field is a sub-field. Velox currently doesn't support
// pushing down filters to sub-fields
if (rowType_->size() == 1) {
// Construct the IN list filter that can be pushed down to the base file
// readers, then update the baseFileScanSpec.
readSingleColumnDeleteValues(subfieldFilters);
} else {
readMultipleColumnDeleteValues(expressionInputs);
}

deleteSplit_.reset();
}

void EqualityDeleteFileReader::readSingleColumnDeleteValues(
SubfieldFilters& subfieldFilters) {
std::unique_ptr<Filter> filter = std::make_unique<AlwaysTrue>();
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto vector =
std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_)->childAt(0);
auto name = rowType_->nameOf(0);

auto typeKind = vector->type()->kind();
VELOX_CHECK(
typeKind != TypeKind::DOUBLE && typeKind != TypeKind::REAL,
"Iceberg does not allow DOUBLE or REAL columns as the equality delete columns: {} : {}",
name,
typeKind);

auto notExistsFilter =
createNotExistsFilter(vector, 0, deleteValuesOutput_->size(), typeKind);
filter = filter->mergeWith(notExistsFilter.get());
}

if (filter->kind() != FilterKind::kAlwaysTrue) {
if (subfieldFilters.find(common::Subfield(rowType_->nameOf(0))) !=
subfieldFilters.end()) {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
subfieldFilters[common::Subfield(rowType_->nameOf(0))]->mergeWith(
filter.get());
} else {
subfieldFilters[common::Subfield(rowType_->nameOf(0))] =
std::move(filter);
}
}
}

void EqualityDeleteFileReader::readMultipleColumnDeleteValues(
std::vector<core::TypedExprPtr>& expressionInputs) {
auto numDeleteFields = rowType_->size();
VELOX_CHECK_GT(
numDeleteFields,
0,
"Iceberg equality delete file should have at least one field.");

// TODO: logical expression simplifications
while (deleteRowReader_->next(kMaxBatchRows, deleteValuesOutput_)) {
if (deleteValuesOutput_->size() == 0) {
continue;
}

deleteValuesOutput_->loadedVector();
auto rowVector = std::dynamic_pointer_cast<RowVector>(deleteValuesOutput_);
auto numDeletedValues = rowVector->childAt(0)->size();

for (int i = 0; i < numDeletedValues; i++) {
std::vector<core::TypedExprPtr> disconjunctInputs;

for (int j = 0; j < numDeleteFields; j++) {
auto type = rowType_->childAt(j);
auto name = rowType_->nameOf(j);
auto value = BaseVector::wrapInConstant(1, i, rowVector->childAt(j));

std::vector<core::TypedExprPtr> isNotEqualInputs;
isNotEqualInputs.push_back(
std::make_shared<FieldAccessTypedExpr>(type, name));
isNotEqualInputs.push_back(std::make_shared<ConstantTypedExpr>(value));
auto isNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), isNotEqualInputs, "neq");

disconjunctInputs.push_back(isNotEqualExpr);
}

auto disconjunctNotEqualExpr =
std::make_shared<CallTypedExpr>(BOOLEAN(), disconjunctInputs, "or");
expressionInputs.push_back(disconjunctNotEqualExpr);
}
}
}

} // namespace facebook::velox::connector::hive::iceberg
Loading
Loading