Skip to content

Commit

Permalink
Add support for Spilling of Distinct Aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
aditi-pandit committed Nov 30, 2023
1 parent 168cda5 commit ef8c5f6
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 40 deletions.
7 changes: 0 additions & 7 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,6 @@ void addSortingKeys(
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
// TODO: Add spilling for aggregations over distinct inputs.
// https://github.com/facebookincubator/velox/issues/7454
for (const auto& aggregate : aggregates_) {
if (aggregate.distinct) {
return false;
}
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
Expand Down
36 changes: 36 additions & 0 deletions velox/exec/AddressableNonNullValueList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,33 @@ HashStringAllocator::Position AddressableNonNullValueList::append(
return startAndFinish.first;
}

HashStringAllocator::Position AddressableNonNullValueList::appendSerialized(
const StringView& value,
HashStringAllocator* allocator) {
ByteStream stream(allocator);
if (!firstHeader_) {
// An array_agg or related begins with an allocation of 5 words and
// 4 bytes for header. This is compact for small arrays (up to 5
// bigints) and efficient if needs to be extended (stores 4 bigints
// and a next pointer. This could be adaptive, with smaller initial
// sizes for lots of small arrays.
static constexpr int kInitialSize = 44;

currentPosition_ = allocator->newWrite(stream, kInitialSize);
firstHeader_ = currentPosition_.header;
} else {
allocator->extendWrite(currentPosition_, stream);
}

// Value has both hash followed by the Complex type value.
stream.appendStringView(value);
++size_;

auto startAndFinish = allocator->finishWrite(stream, 1024);
currentPosition_ = startAndFinish.second;
return startAndFinish.first;
}

namespace {

ByteInputStream prepareRead(
Expand Down Expand Up @@ -97,4 +124,13 @@ void AddressableNonNullValueList::read(
exec::ContainerRowSerde::deserialize(stream, index, &result);
}

// static
void AddressableNonNullValueList::copy(
HashStringAllocator::Position position,
void* dest,
size_t numBytes) {
auto stream = prepareRead(position, true /*skipHash*/);
stream.readBytes(dest, numBytes);
}

} // namespace facebook::velox::aggregate::prestosql
19 changes: 19 additions & 0 deletions velox/exec/AddressableNonNullValueList.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,28 @@ class AddressableNonNullValueList {
}
};

struct HashEqualTo {
bool operator()(
HashStringAllocator::Position left,
HashStringAllocator::Position right) const {
return AddressableNonNullValueList::readHash(left) ==
AddressableNonNullValueList::readHash(right);
}
};

/// Append a non-null value to the end of the list. Returns 'index' that can
/// be used to access the value later.
HashStringAllocator::Position append(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator);

/// Append a non-null serialized (hash + value) to the end of the list.
/// Returns position that can be used to access the value later.
HashStringAllocator::Position appendSerialized(
const StringView& value,
HashStringAllocator* allocator);

/// Removes last element. 'position' must be a value returned from the latest
/// call to 'append'.
void removeLast(HashStringAllocator::Position position) {
Expand Down Expand Up @@ -80,6 +95,10 @@ class AddressableNonNullValueList {
BaseVector& result,
vector_size_t index);

/// Copies numBytes at position to 'dest'.
static void
copy(HashStringAllocator::Position position, void* dest, size_t numBytes);

void free(HashStringAllocator& allocator) {
if (size_ > 0) {
allocator.free(firstHeader_);
Expand Down
30 changes: 27 additions & 3 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class TypedDistinctAggregations : public DistinctAggregations {
sizeof(AccumulatorType),
false, // usesExternalMemory
1, // alignment
nullptr,
[](folly::Range<char**> /*groups*/, VectorPtr& /*result*/) {
VELOX_UNREACHABLE();
VARBINARY(),
[this](folly::Range<char**> groups, VectorPtr& result) {
extractForSpill(groups, result);
},
[this](folly::Range<char**> groups) {
for (auto* group : groups) {
Expand Down Expand Up @@ -103,6 +103,17 @@ class TypedDistinctAggregations : public DistinctAggregations {
inputForAccumulator_.reset();
}

void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) override {
auto* serializedVector = input->asFlatVector<StringView>();

auto* accumulator = reinterpret_cast<AccumulatorType*>(group + offset_);
RowSizeTracker<char, uint32_t> tracker(group[rowSizeOffset_], *allocator_);
accumulator->addSerialized(*serializedVector, index, allocator_);
}

void extractValues(folly::Range<char**> groups, const RowVectorPtr& result)
override {
SelectivityVector rows;
Expand Down Expand Up @@ -138,6 +149,8 @@ class TypedDistinctAggregations : public DistinctAggregations {
}
}

void clear() override {}

private:
bool isSingleInputAggregate() const {
return aggregates_[0]->inputs.size() == 1;
Expand Down Expand Up @@ -185,6 +198,17 @@ class TypedDistinctAggregations : public DistinctAggregations {
return input->template asUnchecked<RowVector>()->children();
}

void extractForSpill(folly::Range<char**> groups, VectorPtr& result) const {
auto* flat = result->asFlatVector<StringView>();
flat->resize(groups.size());

for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator =
reinterpret_cast<AccumulatorType*>(groups[i] + offset_);
accumulator->extractSerialized(result, i);
}
}

memory::MemoryPool* const pool_;
const std::vector<AggregateInfo*> aggregates_;
const std::vector<column_index_t> inputs_;
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/DistinctAggregations.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,18 @@ class DistinctAggregations {
const RowVectorPtr& input,
const SelectivityVector& rows) = 0;

virtual void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) = 0;

/// Computes aggregations and stores results in the specified 'result' vector.
virtual void extractValues(
folly::Range<char**> groups,
const RowVectorPtr& result) = 0;

virtual void clear() = 0;

protected:
HashStringAllocator* allocator_;
int32_t offset_;
Expand Down
36 changes: 34 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,11 @@ void GroupingSet::spill() {
if (sortedAggregations_) {
sortedAggregations_->clear();
}
for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->clear();
}
}
table_->clear();
}

Expand All @@ -997,6 +1002,17 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
spillConfig_->executor);

spiller_->spill(rowIterator);

if (sortedAggregations_) {
sortedAggregations_->clear();
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->clear();
}
}

table_->clear();
}

Expand Down Expand Up @@ -1143,6 +1159,13 @@ void GroupingSet::initializeRow(SpillMergeStream& stream, char* row) {
sortedAggregations_->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}
}
}

void GroupingSet::extractSpillResult(const RowVectorPtr& result) {
Expand Down Expand Up @@ -1170,11 +1193,20 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
}
mergeSelection_.setValid(input.currentIndex(), false);

int index = aggregates_.size() + keyChannels_.size();
if (sortedAggregations_ != nullptr) {
const auto& vector =
input.current().childAt(aggregates_.size() + keyChannels_.size());
const auto& vector = input.current().childAt(index);
sortedAggregations_->addSingleGroupSpillInput(
row, vector, input.currentIndex());
index++;
}

for (const auto& agg : distinctAggregations_) {
if (agg != nullptr) {
agg->addSingleGroupSpillInput(
row, input.current().childAt(index), input.currentIndex());
index++;
}
}
}

Expand Down
Loading

0 comments on commit ef8c5f6

Please sign in to comment.