Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for spilling aggregations over sorted inputs #7526

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ void addSortingKeys(
} // namespace

bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
// TODO: add spilling for aggregations with sorting or with distinct later:
// TODO: Add spilling for aggregations over distinct inputs.
// https://github.com/facebookincubator/velox/issues/7454
// https://github.com/facebookincubator/velox/issues/7455
for (const auto& aggregate : aggregates_) {
if (aggregate.distinct || !aggregate.sortingKeys.empty()) {
if (aggregate.distinct) {
return false;
}
}
Expand Down
15 changes: 15 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,9 @@ void GroupingSet::spill() {
}
++(*numSpillRuns_);
spiller_->spill();
if (sortedAggregations_) {
sortedAggregations_->clear();
}
table_->clear();
}

Expand Down Expand Up @@ -1141,6 +1144,11 @@ void GroupingSet::initializeRow(SpillMergeStream& stream, char* row) {
aggregate.function->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}

if (sortedAggregations_ != nullptr) {
sortedAggregations_->initializeNewGroups(
&row, folly::Range<const vector_size_t*>(&zero, 1));
}
}

void GroupingSet::extractSpillResult(const RowVectorPtr& result) {
Expand All @@ -1167,6 +1175,13 @@ void GroupingSet::updateRow(SpillMergeStream& input, char* row) {
row, mergeSelection_, mergeArgs_, false);
}
mergeSelection_.setValid(input.currentIndex(), false);

if (sortedAggregations_ != nullptr) {
const auto& vector =
input.current().childAt(aggregates_.size() + keyChannels_.size());
sortedAggregations_->addSingleGroupSpillInput(
row, vector, input.currentIndex());
}
}

void GroupingSet::abandonPartialAggregation() {
Expand Down
73 changes: 65 additions & 8 deletions velox/exec/SortedAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,13 @@ struct RowPointers {
}
}

std::vector<char*> read(HashStringAllocator& allocator) {
void read(HashStringAllocator& allocator, folly::Range<char**> rows) {
ByteStream stream(&allocator);
HashStringAllocator::prepareRead(firstBlock, stream);

std::vector<char*> rows(size);
for (auto i = 0; i < size; ++i) {
rows[i] = reinterpret_cast<char*>(stream.read<uintptr_t>());
}
return rows;
}
};
} // namespace
Expand Down Expand Up @@ -127,9 +125,9 @@ Accumulator SortedAggregations::accumulator() const {
sizeof(RowPointers),
false,
1,
nullptr,
[](folly::Range<char**> /*groups*/, VectorPtr& /*result*/) {
VELOX_UNREACHABLE();
ARRAY(VARBINARY()),
[this](folly::Range<char**> groups, VectorPtr& result) {
extractForSpill(groups, result);
},
[this](folly::Range<char**> groups) {
for (auto* group : groups) {
Expand All @@ -139,6 +137,46 @@ Accumulator SortedAggregations::accumulator() const {
}};
}

void SortedAggregations::extractForSpill(
folly::Range<char**> groups,
VectorPtr& result) const {
auto* arrayVector = result->as<ArrayVector>();
arrayVector->resize(groups.size());

auto* rawOffsets =
arrayVector->mutableOffsets(groups.size())->asMutable<vector_size_t>();
auto* rawSizes =
arrayVector->mutableSizes(groups.size())->asMutable<vector_size_t>();

vector_size_t offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator = reinterpret_cast<RowPointers*>(groups[i] + offset_);
rawSizes[i] = accumulator->size;
rawOffsets[i] = offset;
offset += accumulator->size;
}

std::vector<char*> groupRows(offset);

offset = 0;
for (auto i = 0; i < groups.size(); ++i) {
auto* accumulator = reinterpret_cast<RowPointers*>(groups[i] + offset_);
accumulator->read(
*allocator_,
folly::Range(groupRows.data() + offset, accumulator->size));
offset += accumulator->size;
}

auto& elementsVector = arrayVector->elements();
elementsVector->resize(offset);
inputData_->extractSerializedRows(
folly::Range(groupRows.data(), groupRows.size()), elementsVector);
}

void SortedAggregations::clear() {
inputData_->clear();
}

void SortedAggregations::initializeNewGroups(
char** groups,
folly::Range<const vector_size_t*> indices) {
Expand Down Expand Up @@ -196,6 +234,22 @@ void SortedAggregations::addSingleGroupInput(
}
}

void SortedAggregations::addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index) {
auto* arrayVector = input->as<ArrayVector>();
auto* elementsVector = arrayVector->elements()->asFlatVector<StringView>();

const auto size = arrayVector->sizeAt(index);
const auto offset = arrayVector->offsetAt(index);
for (auto i = 0; i < size; ++i) {
char* newRow = inputData_->newRow();
inputData_->storeSerializedRow(*elementsVector, offset + i, newRow);
addNewRow(group, newRow);
}
}

bool SortedAggregations::compareRowsWithKeys(
const char* lhs,
const char* rhs,
Expand Down Expand Up @@ -295,6 +349,7 @@ void SortedAggregations::extractValues(
const RowVectorPtr& result) {
raw_vector<int32_t> temp;
SelectivityVector rows;
std::vector<char*> groupRows;
for (const auto& [sortingSpec, aggregates] : aggregates_) {
std::vector<VectorPtr> inputVectors;
size_t numInputColumns = 0;
Expand All @@ -305,8 +360,10 @@ void SortedAggregations::extractValues(

// For each group, sort inputs, add them to aggregate.
for (auto* group : groups) {
auto groupRows =
reinterpret_cast<RowPointers*>(group + offset_)->read(*allocator_);
auto* accumulator = reinterpret_cast<RowPointers*>(group + offset_);
groupRows.resize(accumulator->size);
accumulator->read(
*allocator_, folly::Range(groupRows.data(), groupRows.size()));

sortSingleGroup(groupRows, sortingSpec);

Expand Down
10 changes: 10 additions & 0 deletions velox/exec/SortedAggregations.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ class SortedAggregations {

void addSingleGroupInput(char* group, const RowVectorPtr& input);

void addSingleGroupSpillInput(
char* group,
const VectorPtr& input,
vector_size_t index);

void noMoreInput();

/// Sorts input row for the specified groups, computes aggregations and stores
/// results in the specified 'result' vector.
void extractValues(folly::Range<char**> groups, const RowVectorPtr& result);

/// Clears all data accumulated so far. Used to release memory after spilling.
void clear();

private:
void addNewRow(char* group, char* newRow);

Expand All @@ -90,6 +98,8 @@ class SortedAggregations {
const AggregateInfo& aggregate,
std::vector<VectorPtr>& inputVectors);

void extractForSpill(folly::Range<char**> groups, VectorPtr& result) const;

struct Hash {
static uint64_t hashSortOrder(const core::SortOrder& sortOrder) {
return bits::hashMix(
Expand Down
53 changes: 33 additions & 20 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1814,27 +1814,40 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) {
auto vectors = makeVectors(rowType_, 100, 10);
createDuckDbTable(vectors);
auto spillDirectory = exec::test::TempDirectoryPath::create();

core::PlanNodeId aggrNodeId;
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(PlanBuilder()
.values(vectors)
.singleAggregation({"c1"}, {"count(c0 ORDER BY c2)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode())
.assertResults(
"SELECT c1, count(c0 ORDER BY c2) FROM tmp GROUP BY c1");
// Verify that spilling is not triggered.
const auto& queryConfig = task->queryCtx()->queryConfig();
ASSERT_TRUE(queryConfig.spillEnabled());
ASSERT_TRUE(queryConfig.aggregationSpillEnabled());
ASSERT_EQ(100, queryConfig.testingSpillPct());
ASSERT_EQ(toPlanStats(task->taskStats()).at(aggrNodeId).spilledBytes, 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);

auto testPlan = [&](const core::PlanNodePtr& plan, const std::string& sql) {
SCOPED_TRACE(sql);
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(QueryConfig::kTestingSpillPct, "100")
.plan(plan)
.assertResults(sql);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(aggrNodeId);
checkSpillStats(stats, true);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
};

auto plan = PlanBuilder()
.values(vectors)
.singleAggregation({"c0"}, {"array_agg(c1 ORDER BY c1)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
testPlan(plan, "SELECT c0, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1");

plan = PlanBuilder()
.values(vectors)
.project({"c0 % 7", "c1"})
.singleAggregation({"p0"}, {"array_agg(c1 ORDER BY c1)"}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
testPlan(
plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1");
}

TEST_F(AggregationTest, distinctSpillWithMemoryLimit) {
Expand Down