Skip to content

Commit

Permalink
Add distinct aggregation fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Jan 11, 2024
1 parent 3cabe76 commit bb36e24
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 89 deletions.
2 changes: 2 additions & 0 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ std::unique_ptr<DistinctAggregations> DistinctAggregations::create(
case TypeKind::TIMESTAMP:
return std::make_unique<TypedDistinctAggregations<Timestamp>>(
aggregates, inputType, pool);
case TypeKind::VARBINARY:
[[fallthrough]];
case TypeKind::VARCHAR:
return std::make_unique<TypedDistinctAggregations<StringView>>(
aggregates, inputType, pool);
Expand Down
288 changes: 203 additions & 85 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ class AggregationFuzzer : public AggregationFuzzerBase {
void updateReferenceQueryStats(
AggregationFuzzerBase::ReferenceQueryErrorCode errorCode);

bool checkResult(
const core::PlanNodePtr& plan,
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<PlanWithSplits>& plans,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers = 2);

// Return 'true' if query plans failed.
bool verifyWindow(
const std::vector<std::string>& partitionKeys,
Expand All @@ -136,6 +144,15 @@ class AggregationFuzzer : public AggregationFuzzerBase {
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input);

// Return 'true' if query plans failed.
bool verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers);

void verifyAggregation(const std::vector<PlanWithSplits>& plans);

static bool hasPartialGroupBy(const core::PlanNodePtr& plan) {
Expand Down Expand Up @@ -301,6 +318,22 @@ bool canSortInputs(const CallableSignature& signature) {
return true;
}

// Returns true if specified aggregate function can be applied to distinct
// inputs.
bool canDistinctInputs(const CallableSignature& signature) {
if (signature.args.empty()) {
return false;
}

for (const auto& arg : signature.args) {
if (!arg->isComparable()) {
return false;
}
}

return true;
}

void AggregationFuzzer::go() {
VELOX_CHECK(
FLAGS_steps > 0 || FLAGS_duration_sec > 0,
Expand All @@ -312,19 +345,6 @@ void AggregationFuzzer::go() {
while (!isDone(iteration, startTime)) {
LOG(INFO) << "==============================> Started iteration "
<< iteration << " (seed: " << currentSeed_ << ")";

// 10% of times test distinct aggregation.
if (vectorFuzzer_.coinToss(0.1)) {
++stats_.numDistinct;

std::vector<TypePtr> types;
std::vector<std::string> names;

auto groupingKeys = generateKeys("g", names, types);
auto input = generateInputData(names, types, std::nullopt);

verifyAggregation(groupingKeys, {}, {}, input, false, {});
} else {
// Pick a random signature.
auto signatureWithStats = pickSignature();
signatureWithStats.second.numRuns++;
Expand Down Expand Up @@ -367,7 +387,17 @@ void AggregationFuzzer::go() {
(signature.name.find("approx_") == std::string::npos) &&
vectorFuzzer_.coinToss(0.2);

auto call = makeFunctionCall(signature.name, argNames, sortedInputs);
// Exclude approx_xxx, merge aggregations since it conflicts with
// distinct semantics.
// TODO: Add a exclude list for aggregations that cannot be supported by
// DistinctAggregations.
const bool distinct = !sortedInputs && canDistinctInputs(signature) &&
(signature.name.find("approx_") == std::string::npos) &&
(signature.name.find("merge") == std::string::npos) &&
vectorFuzzer_.coinToss(0.2);

auto call =
makeFunctionCall(signature.name, argNames, sortedInputs, distinct);

// 20% of times use mask.
std::vector<std::string> masks;
Expand All @@ -389,6 +419,10 @@ void AggregationFuzzer::go() {
}

auto input = generateInputData(argNames, argTypes, signature);
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
customVerifier = customVerificationFunctions_.at(signature.name);
}

if (sortedInputs) {
++stats_.numSortedInputs;
Expand All @@ -397,12 +431,19 @@ void AggregationFuzzer::go() {
if (failed) {
signatureWithStats.second.numFailed++;
}
} else {
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
customVerifier = customVerificationFunctions_.at(signature.name);
} else if (distinct) {
++stats_.numDistinct;
bool failed = verifyDistinctAggregation(
groupingKeys,
{call},
masks,
input,
customVerification,
{customVerifier});
if (failed) {
signatureWithStats.second.numFailed++;
}

} else {
bool failed = verifyAggregation(
groupingKeys,
{call},
Expand All @@ -415,7 +456,7 @@ void AggregationFuzzer::go() {
}
}
}
}

LOG(INFO) << "==============================> Done with iteration "
<< iteration;

Expand Down Expand Up @@ -675,6 +716,7 @@ bool AggregationFuzzer::verifyWindow(
}
}

namespace {
void resetCustomVerifiers(
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers) {
for (auto& verifier : customVerifiers) {
Expand All @@ -684,34 +726,100 @@ void resetCustomVerifiers(
}
}

void initVerifier(
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
const std::vector<RowVectorPtr>& input,
const std::vector<std::string>& groupingKeys,
const std::shared_ptr<const core::AggregationNode>& aggregationNode) {
for (auto i = 0; i < customVerifiers.size(); ++i) {
auto& verifier = customVerifiers[i];
if (verifier == nullptr) {
continue;
}

verifier->initialize(
input,
groupingKeys,
aggregationNode->aggregates()[i],
aggregationNode->aggregateNames()[i]);
}
}
} // namespace

bool AggregationFuzzer::checkResult(
const core::PlanNodePtr& plan,
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<PlanWithSplits>& plans,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers) {
try {
auto resultOrError = execute(plan);
if (resultOrError.exceptionPtr) {
++stats_.numFailed;
}

// TODO Use ResultVerifier::compare API to compare Velox results with
// reference DB results once reference query runner is updated to return
// results as Velox vectors.
std::optional<MaterializedRowMultiset> expectedResult;
if (resultOrError.result != nullptr) {
if (!customVerification) {
auto referenceResult = computeReferenceResults(plan, input);
updateReferenceQueryStats(referenceResult.second);
expectedResult = referenceResult.first;
} else {
++stats_.numVerificationSkipped;

for (auto& verifier : customVerifiers) {
if (verifier != nullptr && verifier->supportsVerify()) {
VELOX_CHECK(
verifier->verify(resultOrError.result),
"Aggregation results failed custom verification");
}
}
}
}

if (expectedResult && resultOrError.result) {
++stats_.numVerified;
VELOX_CHECK(
assertEqualResults(
expectedResult.value(),
plan->outputType(),
{resultOrError.result}),
"Velox and reference DB results don't match");
LOG(INFO) << "Verified results against reference DB";
}

testPlans(
plans, customVerification, customVerifiers, resultOrError, maxDrivers);

return resultOrError.exceptionPtr != nullptr;
} catch (...) {
if (!reproPersistPath_.empty()) {
persistReproInfo(plans, reproPersistPath_);
}
throw;
}
}

bool AggregationFuzzer::verifyAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers) {
auto firstPlan = PlanBuilder()
.values(input)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode();
const auto firstPlan = PlanBuilder()
.values(input)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode();

if (customVerification) {
const auto& aggregationNode =
std::dynamic_pointer_cast<const core::AggregationNode>(firstPlan);

for (auto i = 0; i < customVerifiers.size(); ++i) {
auto& verifier = customVerifiers[i];
if (verifier == nullptr) {
continue;
}

verifier->initialize(
input,
groupingKeys,
aggregationNode->aggregates()[i],
aggregationNode->aggregateNames()[i]);
}
initVerifier(customVerifiers, input, groupingKeys, aggregationNode);
}

SCOPE_EXIT {
Expand Down Expand Up @@ -784,54 +892,8 @@ bool AggregationFuzzer::verifyAggregation(
persistReproInfo(plans, reproPersistPath_);
}

try {
auto resultOrError = execute(firstPlan);
if (resultOrError.exceptionPtr) {
++stats_.numFailed;
}

// TODO Use ResultVerifier::compare API to compare Velox results with
// reference DB results once reference query runner is updated to return
// results as Velox vectors.
std::optional<MaterializedRowMultiset> expectedResult;
if (resultOrError.result != nullptr) {
if (!customVerification) {
auto referenceResult = computeReferenceResults(firstPlan, input);
updateReferenceQueryStats(referenceResult.second);
expectedResult = referenceResult.first;
} else {
++stats_.numVerificationSkipped;

for (auto& verifier : customVerifiers) {
if (verifier != nullptr && verifier->supportsVerify()) {
VELOX_CHECK(
verifier->verify(resultOrError.result),
"Aggregation results failed custom verification");
}
}
}
}

if (expectedResult && resultOrError.result) {
++stats_.numVerified;
VELOX_CHECK(
assertEqualResults(
expectedResult.value(),
firstPlan->outputType(),
{resultOrError.result}),
"Velox and reference DB results don't match");
LOG(INFO) << "Verified results against reference DB";
}

testPlans(plans, customVerification, customVerifiers, resultOrError);

return resultOrError.exceptionPtr != nullptr;
} catch (...) {
if (!reproPersistPath_.empty()) {
persistReproInfo(plans, reproPersistPath_);
}
throw;
}
return checkResult(
firstPlan, customVerification, input, plans, customVerifiers);
}

bool AggregationFuzzer::verifySortedAggregation(
Expand Down Expand Up @@ -915,6 +977,62 @@ bool AggregationFuzzer::verifySortedAggregation(
return resultOrError.exceptionPtr != nullptr;
}

bool AggregationFuzzer::verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers) {
const auto firstPlan = PlanBuilder()
.values(input)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode();

if (customVerification) {
const auto& aggregationNode =
std::dynamic_pointer_cast<const core::AggregationNode>(firstPlan);
initVerifier(customVerifiers, input, groupingKeys, aggregationNode);
}

SCOPE_EXIT {
if (customVerification) {
resetCustomVerifiers(customVerifiers);
}
};

// Create all the plans upfront.
std::vector<PlanWithSplits> plans;

auto directory = exec::test::TempDirectoryPath::create();

// Alternate between using Values and TableScan node.

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
const auto inputRowType = asRowType(input[0]->type());
if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) {
auto splits = makeSplits(input, directory->path);

plans.push_back(
{PlanBuilder()
.tableScan(inputRowType)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode(),
splits});
}

if (persistAndRunOnce_) {
persistReproInfo(plans, reproPersistPath_);
}

// Distinct aggregations cannot be split into partial and final, hence we
// could ony use single-thread to test the plan.
return checkResult(
firstPlan, customVerification, input, plans, customVerifiers, 1);
}

// verifyAggregation(std::vector<core::PlanNodePtr> plans) is tied to plan
// created by previous verifyAggregation function. Changes in nodes there will
// require corresponding changes here.
Expand Down
Loading

0 comments on commit bb36e24

Please sign in to comment.