Skip to content

Commit

Permalink
Add distinct aggregation fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Jan 19, 2024
1 parent ec3bb6b commit 8edbab8
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 26 deletions.
2 changes: 2 additions & 0 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ std::unique_ptr<DistinctAggregations> DistinctAggregations::create(
case TypeKind::TIMESTAMP:
return std::make_unique<TypedDistinctAggregations<Timestamp>>(
aggregates, inputType, pool);
case TypeKind::VARBINARY:
[[fallthrough]];
case TypeKind::VARCHAR:
return std::make_unique<TypedDistinctAggregations<StringView>>(
aggregates, inputType, pool);
Expand Down
152 changes: 130 additions & 22 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class AggregationFuzzer : public AggregationFuzzerBase {
// Number of iterations using distinct aggregation.
size_t numDistinct{0};

// Number of iterations using aggregations over distinct inputs.
size_t numDistinctInputs{0};

// Number of iterations using window expressions.
size_t numWindow{0};

Expand Down Expand Up @@ -145,7 +148,17 @@ class AggregationFuzzer : public AggregationFuzzerBase {
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers = 2);
int32_t maxDrivers = 2,
bool testWithSpilling = true);

// Return 'true' if query plans failed.
bool verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers);

static bool hasPartialGroupBy(const core::PlanNodePtr& plan) {
auto partialAgg = core::PlanNode::findFirstNode(
Expand All @@ -167,7 +180,8 @@ class AggregationFuzzer : public AggregationFuzzerBase {
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
const velox::test::ResultOrError& expected,
int32_t maxDrivers = 2) {
int32_t maxDrivers = 2,
bool testWithSpilling = true) {
for (auto i = 0; i < plans.size(); ++i) {
const auto& planWithSplits = plans[i];

Expand All @@ -181,15 +195,17 @@ class AggregationFuzzer : public AggregationFuzzerBase {
expected,
maxDrivers);

LOG(INFO) << "Testing plan #" << i << " with spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);
if (testWithSpilling) {
LOG(INFO) << "Testing plan #" << i << " with spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);
}

if (hasPartialGroupBy(planWithSplits.plan)) {
LOG(INFO) << "Testing plan #" << i
Expand Down Expand Up @@ -310,6 +326,21 @@ bool canSortInputs(const CallableSignature& signature) {
return true;
}

// Returns true if specified aggregate function can be applied to distinct
// inputs.
bool supportsDistinctInputs(const CallableSignature& signature) {
if (!signature.args.empty()) {
return false;
}

const auto& arg = signature.args.at(0);
if (!arg->isComparable()) {
return false;
}

return true;
}

void AggregationFuzzer::go() {
VELOX_CHECK(
FLAGS_steps > 0 || FLAGS_duration_sec > 0,
Expand Down Expand Up @@ -376,7 +407,16 @@ void AggregationFuzzer::go() {
(signature.name.find("approx_") == std::string::npos) &&
vectorFuzzer_.coinToss(0.2);

auto call = makeFunctionCall(signature.name, argNames, sortedInputs);
// Exclude approx_xxx aggregations since their semantics may conflict
// with the semantics of the DistinctAggregation, say, the result of
// approx_percentile(x) with lots of duplicate input may be different
// from approx_percentile(distinct x).
const bool distinctInputs = !sortedInputs &&
(signature.name.find("approx_") == std::string::npos) &&
supportsDistinctInputs(signature) && vectorFuzzer_.coinToss(0.2);

auto call = makeFunctionCall(
signature.name, argNames, sortedInputs, distinctInputs);

// 20% of times use mask.
std::vector<std::string> masks;
Expand All @@ -398,6 +438,10 @@ void AggregationFuzzer::go() {
}

auto input = generateInputData(argNames, argTypes, signature);
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
customVerifier = customVerificationFunctions_.at(signature.name);
}

if (sortedInputs) {
++stats_.numSortedInputs;
Expand All @@ -406,12 +450,19 @@ void AggregationFuzzer::go() {
if (failed) {
signatureWithStats.second.numFailed++;
}
} else {
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
customVerifier = customVerificationFunctions_.at(signature.name);
} else if (distinctInputs) {
++stats_.numDistinctInputs;
bool failed = verifyDistinctAggregation(
groupingKeys,
{call},
masks,
input,
customVerification,
{customVerifier});
if (failed) {
signatureWithStats.second.numFailed++;
}

} else {
bool failed = verifyAggregation(
groupingKeys,
{call},
Expand Down Expand Up @@ -747,9 +798,6 @@ bool AggregationFuzzer::verifyAggregation(

// Alternate between using Values and TableScan node.

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
const auto inputRowType = asRowType(input[0]->type());
if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) {
auto splits = makeSplits(input, directory->path);
Expand Down Expand Up @@ -985,6 +1033,8 @@ void AggregationFuzzer::Stats::print(size_t numIterations) const {
<< printPercentageStat(numGroupBy, numIterations);
LOG(INFO) << "Total distinct aggregations: "
<< printPercentageStat(numDistinct, numIterations);
LOG(INFO) << "Total aggregations over distinct inputs: "
<< printPercentageStat(numDistinctInputs, numIterations);
LOG(INFO) << "Total aggregations over sorted inputs: "
<< printPercentageStat(numSortedInputs, numIterations);
LOG(INFO) << "Total window expressions: "
Expand All @@ -1005,7 +1055,8 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers) {
int32_t maxDrivers,
bool testWithSpilling) {
try {
auto firstPlan = plans.at(0).plan;
auto resultOrError = execute(firstPlan);
Expand Down Expand Up @@ -1046,7 +1097,13 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
LOG(INFO) << "Verified results against reference DB";
}

testPlans(plans, customVerification, customVerifiers, resultOrError);
testPlans(
plans,
customVerification,
customVerifiers,
resultOrError,
maxDrivers,
testWithSpilling);

return resultOrError.exceptionPtr != nullptr;
} catch (...) {
Expand All @@ -1057,5 +1114,56 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
}
}

bool AggregationFuzzer::verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers) {
const auto firstPlan = PlanBuilder()
.values(input)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode();

if (customVerification) {
initializeVerifiers(firstPlan, customVerifiers, input, groupingKeys);
}

SCOPE_EXIT {
if (customVerification) {
resetCustomVerifiers(customVerifiers);
}
};

// Create all the plans upfront.
std::vector<PlanWithSplits> plans;
plans.push_back({firstPlan, {}});

// Alternate between using Values and TableScan node.

const auto inputRowType = asRowType(input[0]->type());
if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) {
auto directory = exec::test::TempDirectoryPath::create();
auto splits = makeSplits(input, directory->path);

plans.push_back(
{PlanBuilder()
.tableScan(inputRowType)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode(),
splits});
}

if (persistAndRunOnce_) {
persistReproInfo(plans, reproPersistPath_);
}

// Distinct aggregation must run single-threaded or data must be partitioned
// on group-by keys among threads.
return compareEquivalentPlanResults(
plans, customVerification, input, customVerifiers, 1, false);
}

} // namespace
} // namespace facebook::velox::exec::test
16 changes: 13 additions & 3 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ void writeToFile(
}
} // namespace

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
bool isTableScanSupported(const TypePtr& type) {
if (type->kind() == TypeKind::ROW && type->size() == 0) {
return false;
Expand Down Expand Up @@ -565,11 +568,18 @@ void printStats(const AggregationFuzzerBase::FunctionsStats& stats) {
std::string makeFunctionCall(
const std::string& name,
const std::vector<std::string>& argNames,
bool sortedInputs) {
bool sortedInputs,
bool distinctInputs) {
std::ostringstream call;
call << name << "(" << folly::join(", ", argNames);
call << name << "(";

const auto args = folly::join(", ", argNames);
if (sortedInputs) {
call << " ORDER BY " << folly::join(", ", argNames);
call << args << " ORDER BY " << args;
} else if (distinctInputs) {
call << "distinct " << args;
} else {
call << args;
}
call << ")";

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ std::string printPercentageStat(size_t n, size_t total);
std::string makeFunctionCall(
const std::string& name,
const std::vector<std::string>& argNames,
bool sortedInputs);
bool sortedInputs = false,
bool distinctInputs = false);

// Returns a list of column names from c0 to cn.
std::vector<std::string> makeNames(size_t n);
Expand Down

0 comments on commit 8edbab8

Please sign in to comment.