Skip to content

Commit

Permalink
Add support for spilling aggregations over sorted inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
mbasmanova committed Nov 11, 2023
1 parent 484e634 commit 885809f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 31 deletions.
5 changes: 2 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ void addSortingKeys(
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
// TODO: add spilling for aggregations with sorting or with distinct later:
// TODO: Add spilling for aggregations over distinct inputs.
// https://github.com/facebookincubator/velox/issues/7454
// https://github.com/facebookincubator/velox/issues/7455
for (const auto& aggregate : aggregates_) {
if (aggregate.distinct || !aggregate.sortingKeys.empty()) {
if (aggregate.distinct) {
return false;
}
}
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,9 @@ void GroupingSet::spill() {
}
++(*numSpillRuns_);
spiller_->spill();
if (sortedAggregations_) {
sortedAggregations_->clear();
}
table_->clear();
}

Expand Down Expand Up @@ -1141,6 +1144,11 @@ void GroupingSet::initializeRow(SpillMergeStream& stream, char* row) {
aggregate.function->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}

if (sortedAggregations_ != nullptr) {
sortedAggregations_->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}
}

void GroupingSet::extractSpillResult(const RowVectorPtr& result) {
Expand All @@ -1167,6 +1175,13 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
row, mergeSelection_, mergeArgs_, false);
}
mergeSelection_.setValid(input.currentIndex(), false);

if (sortedAggregations_ != nullptr) {
const auto& vector =
input.current().childAt(aggregates_.size() + keyChannels_.size());
sortedAggregations_->addSingleGroupSpillInput(
row, vector, input.currentIndex());
}
}

void GroupingSet::abandonPartialAggregation() {
Expand Down
73 changes: 65 additions & 8 deletions velox/exec/SortedAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ struct RowPointers {
}
}

std::vector<char*> read(HashStringAllocator& allocator) {
void read(HashStringAllocator& allocator, folly::Range<char**> rows) {
ByteStream stream(&allocator);
HashStringAllocator::prepareRead(firstBlock, stream);

std::vector<char*> rows(size);
for (auto i = 0; i < size; ++i) {
rows[i] = reinterpret_cast<char*>(stream.read<uintptr_t>());
}
return rows;
}
};
} // namespace
Expand Down Expand Up @@ -127,9 +125,9 @@ Accumulator SortedAggregations::accumulator() const {
sizeof(RowPointers),
false,
1,
nullptr,
[](folly::Range<char**> /*groups*/, VectorPtr& /*result*/) {
VELOX_UNREACHABLE();
ARRAY(VARBINARY()),
[this](folly::Range<char**> groups, VectorPtr& result) {
extractForSpill(groups, result);
},
[this](folly::Range<char**> groups) {
for (auto* group : groups) {
Expand All @@ -139,6 +137,46 @@ Accumulator SortedAggregations::accumulator() const {
}};
}

void SortedAggregations::extractForSpill(
folly::Range<char**> groups,
VectorPtr& result) const {
auto* arrayVector = result->as<ArrayVector>();
arrayVector->resize(groups.size());

auto* rawOffsets =
arrayVector->mutableOffsets(groups.size())->asMutable<vector_size_t>();
auto* rawSizes =
arrayVector->mutableSizes(groups.size())->asMutable<vector_size_t>();

vector_size_t offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator = reinterpret_cast<RowPointers*>(groups[i] + offset_);
rawSizes[i] = accumulator->size;
rawOffsets[i] = offset;
offset += accumulator->size;
}

std::vector<char*> groupRows(offset);

offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator = reinterpret_cast<RowPointers*>(groups[i] + offset_);
accumulator->read(
*allocator_,
folly::Range(groupRows.data() + offset, accumulator->size));
offset += accumulator->size;
}

auto& elementsVector = arrayVector->elements();
elementsVector->resize(offset);
inputData_->extractSerializedRows(
folly::Range(groupRows.data(), groupRows.size()), elementsVector);
}

void SortedAggregations::clear() {
inputData_->clear();
}

void SortedAggregations::initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) {
Expand Down Expand Up @@ -196,6 +234,22 @@ void SortedAggregations::addSingleGroupInput(
}
}

void SortedAggregations::addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) {
auto* arrayVector = input->as<ArrayVector>();
auto* elementsVector = arrayVector->elements()->asFlatVector<StringView>();

const auto size = arrayVector->sizeAt(index);
const auto offset = arrayVector->offsetAt(index);
for (auto i = 0; i < size; ++i) {
char* newRow = inputData_->newRow();
inputData_->storeSerializedRow(*elementsVector, offset + i, newRow);
addNewRow(group, newRow);
}
}

bool SortedAggregations::compareRowsWithKeys(
const char* lhs,
const char* rhs,
Expand Down Expand Up @@ -295,6 +349,7 @@ void SortedAggregations::extractValues(
const RowVectorPtr& result) {
raw_vector<int32_t> temp;
SelectivityVector rows;
std::vector<char*> groupRows;
for (const auto& [sortingSpec, aggregates] : aggregates_) {
std::vector<VectorPtr> inputVectors;
size_t numInputColumns = 0;
Expand All @@ -305,8 +360,10 @@ void SortedAggregations::extractValues(

// For each group, sort inputs, add them to aggregate.
for (auto* group : groups) {
auto groupRows =
reinterpret_cast<RowPointers*>(group + offset_)->read(*allocator_);
auto* accumulator = reinterpret_cast<RowPointers*>(group + offset_);
groupRows.resize(accumulator->size);
accumulator->read(
*allocator_, folly::Range(groupRows.data(), groupRows.size()));

sortSingleGroup(groupRows, sortingSpec);

Expand Down
10 changes: 10 additions & 0 deletions velox/exec/SortedAggregations.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ class SortedAggregations {

void addSingleGroupInput(char* group, const RowVectorPtr& input);

void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index);

void noMoreInput();

/// Sorts input row for the specified groups, computes aggregations and stores
/// results in the specified 'result' vector.
void extractValues(folly::Range<char**> groups, const RowVectorPtr& result);

/// Clears all data accumulated so far. Used to release memory after spilling.
void clear();

private:
void addNewRow(char* group, char* newRow);

Expand All @@ -90,6 +98,8 @@ class SortedAggregations {
const AggregateInfo& aggregate,
std::vector<VectorPtr>& inputVectors);

void extractForSpill(folly::Range<char**> groups, VectorPtr& result) const;

struct Hash {
static uint64_t hashSortOrder(const core::SortOrder& sortOrder) {
return bits::hashMix(
Expand Down
53 changes: 33 additions & 20 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1804,27 +1804,40 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) {
auto vectors = makeVectors(rowType_, 100, 10);
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();

core::PlanNodeId aggrNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c1"}, {"count(c0 ORDER BY c2)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults(
"SELECT c1, count(c0 ORDER BY c2) FROM tmp GROUP BY c1");
// Verify that spilling is not triggered.
const auto& queryConfig = task->queryCtx()->queryConfig();
ASSERT_TRUE(queryConfig.spillEnabled());
ASSERT_TRUE(queryConfig.aggregationSpillEnabled());
ASSERT_EQ(100, queryConfig.testingSpillPct());
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);

auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) {
SCOPED_TRACE(sql);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(plan)
.assertResults(sql);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(aggrNodeId);
checkSpillStats(stats, true);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
};

auto plan = PlanBuilder()
.values(vectors)
.singleAggregation({"c0"}, {"array_agg(c1 ORDER BY c1)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
testPlan(plan, "SELECT c0, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1");

plan = PlanBuilder()
.values(vectors)
.project({"c0 % 7", "c1"})
.singleAggregation({"p0"}, {"array_agg(c1 ORDER BY c1)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
testPlan(
plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1");
}

TEST_F(AggregationTest, distinctSpillWithMemoryLimit) {
Expand Down

0 comments on commit 885809f

Please sign in to comment.