Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregations over distinct inputs to AggregationFuzzer #8328

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/exec/DistinctAggregations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ std::unique_ptr<DistinctAggregations> DistinctAggregations::create(
case TypeKind::TIMESTAMP:
return std::make_unique<TypedDistinctAggregations<Timestamp>>(
aggregates, inputType, pool);
case TypeKind::VARBINARY:
[[fallthrough]];
case TypeKind::VARCHAR:
return std::make_unique<TypedDistinctAggregations<StringView>>(
aggregates, inputType, pool);
Expand Down
155 changes: 133 additions & 22 deletions velox/exec/fuzzer/AggregationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class AggregationFuzzer : public AggregationFuzzerBase {
// Number of iterations using distinct aggregation.
size_t numDistinct{0};

// Number of iterations using aggregations over distinct inputs.
size_t numDistinctInputs{0};

// Number of iterations using window expressions.
size_t numWindow{0};

Expand Down Expand Up @@ -145,7 +148,17 @@ class AggregationFuzzer : public AggregationFuzzerBase {
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers = 2);
int32_t maxDrivers = 2,
bool testWithSpilling = true);

// Return 'true' if query plans failed.
bool verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers);

static bool hasPartialGroupBy(const core::PlanNodePtr& plan) {
auto partialAgg = core::PlanNode::findFirstNode(
Expand All @@ -167,7 +180,8 @@ class AggregationFuzzer : public AggregationFuzzerBase {
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
const velox::test::ResultOrError& expected,
int32_t maxDrivers = 2) {
int32_t maxDrivers = 2,
bool testWithSpilling = true) {
for (auto i = 0; i < plans.size(); ++i) {
const auto& planWithSplits = plans[i];

Expand All @@ -181,15 +195,17 @@ class AggregationFuzzer : public AggregationFuzzerBase {
expected,
maxDrivers);

LOG(INFO) << "Testing plan #" << i << " with spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);
if (testWithSpilling) {
LOG(INFO) << "Testing plan #" << i << " with spilling";
testPlan(
planWithSplits,
true /*injectSpill*/,
false /*abandonPartial*/,
customVerification,
customVerifiers,
expected,
maxDrivers);
}

if (hasPartialGroupBy(planWithSplits.plan)) {
LOG(INFO) << "Testing plan #" << i
Expand Down Expand Up @@ -310,6 +326,21 @@ bool canSortInputs(const CallableSignature& signature) {
return true;
}

// Returns true if specified aggregate function can be applied to distinct
// inputs.
bool supportsDistinctInputs(const CallableSignature& signature) {
if (signature.args.empty()) {
return false;
}

const auto& arg = signature.args.at(0);
if (!arg->isComparable()) {
return false;
}

return true;
}

void AggregationFuzzer::go() {
VELOX_CHECK(
FLAGS_steps > 0 || FLAGS_duration_sec > 0,
Expand Down Expand Up @@ -376,7 +407,18 @@ void AggregationFuzzer::go() {
(signature.name.find("approx_") == std::string::npos) &&
vectorFuzzer_.coinToss(0.2);

auto call = makeFunctionCall(signature.name, argNames, sortedInputs);
// Exclude approx_xxx aggregations since their verifiers may not be able
// to verify the results. The approx_percentile verifier would discard
// the distinct property when calculating the expected result, say the
// expected result of the verifier would be approx_percentile(x), which
// may be different from the actual result of approx_percentile(distinct
// x).
const bool distinctInputs = !sortedInputs &&
(signature.name.find("approx_") == std::string::npos) &&
supportsDistinctInputs(signature) && vectorFuzzer_.coinToss(0.2);

auto call = makeFunctionCall(
signature.name, argNames, sortedInputs, distinctInputs);

// 20% of times use mask.
std::vector<std::string> masks;
Expand All @@ -398,6 +440,10 @@ void AggregationFuzzer::go() {
}

auto input = generateInputData(argNames, argTypes, signature);
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
duanmeng marked this conversation as resolved.
Show resolved Hide resolved
customVerifier = customVerificationFunctions_.at(signature.name);
}

if (sortedInputs) {
++stats_.numSortedInputs;
Expand All @@ -406,12 +452,19 @@ void AggregationFuzzer::go() {
if (failed) {
signatureWithStats.second.numFailed++;
}
} else {
std::shared_ptr<ResultVerifier> customVerifier;
if (customVerification) {
customVerifier = customVerificationFunctions_.at(signature.name);
} else if (distinctInputs) {
++stats_.numDistinctInputs;
bool failed = verifyDistinctAggregation(
groupingKeys,
{call},
masks,
input,
customVerification,
{customVerifier});
if (failed) {
signatureWithStats.second.numFailed++;
}

} else {
bool failed = verifyAggregation(
groupingKeys,
{call},
Expand Down Expand Up @@ -747,9 +800,6 @@ bool AggregationFuzzer::verifyAggregation(

// Alternate between using Values and TableScan node.

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
const auto inputRowType = asRowType(input[0]->type());
if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) {
auto splits = makeSplits(input, directory->path);
Expand Down Expand Up @@ -985,6 +1035,8 @@ void AggregationFuzzer::Stats::print(size_t numIterations) const {
<< printPercentageStat(numGroupBy, numIterations);
LOG(INFO) << "Total distinct aggregations: "
<< printPercentageStat(numDistinct, numIterations);
LOG(INFO) << "Total aggregations over distinct inputs: "
<< printPercentageStat(numDistinctInputs, numIterations);
LOG(INFO) << "Total aggregations over sorted inputs: "
<< printPercentageStat(numSortedInputs, numIterations);
LOG(INFO) << "Total window expressions: "
Expand All @@ -1005,7 +1057,8 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
bool customVerification,
const std::vector<RowVectorPtr>& input,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers,
int32_t maxDrivers) {
int32_t maxDrivers,
bool testWithSpilling) {
try {
auto firstPlan = plans.at(0).plan;
auto resultOrError = execute(firstPlan);
Expand Down Expand Up @@ -1046,7 +1099,13 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
LOG(INFO) << "Verified results against reference DB";
}

testPlans(plans, customVerification, customVerifiers, resultOrError);
testPlans(
plans,
customVerification,
customVerifiers,
resultOrError,
maxDrivers,
testWithSpilling);

return resultOrError.exceptionPtr != nullptr;
} catch (...) {
Expand All @@ -1057,5 +1116,57 @@ bool AggregationFuzzer::compareEquivalentPlanResults(
}
}

bool AggregationFuzzer::verifyDistinctAggregation(
const std::vector<std::string>& groupingKeys,
const std::vector<std::string>& aggregates,
const std::vector<std::string>& masks,
const std::vector<RowVectorPtr>& input,
bool customVerification,
const std::vector<std::shared_ptr<ResultVerifier>>& customVerifiers) {
const auto firstPlan = PlanBuilder()
.values(input)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode();

if (customVerification) {
mbasmanova marked this conversation as resolved.
Show resolved Hide resolved
initializeVerifiers(firstPlan, customVerifiers, input, groupingKeys);
}

SCOPE_EXIT {
if (customVerification) {
resetCustomVerifiers(customVerifiers);
}
};

// Create all the plans upfront.
std::vector<PlanWithSplits> plans;
plans.push_back({firstPlan, {}});

// Alternate between using Values and TableScan node.

std::shared_ptr<exec::test::TempDirectoryPath> directory;
const auto inputRowType = asRowType(input[0]->type());
if (isTableScanSupported(inputRowType) && vectorFuzzer_.coinToss(0.5)) {
directory = exec::test::TempDirectoryPath::create();
auto splits = makeSplits(input, directory->path);

plans.push_back(
{PlanBuilder()
.tableScan(inputRowType)
.singleAggregation(groupingKeys, aggregates, masks)
.planNode(),
splits});
}

if (persistAndRunOnce_) {
persistReproInfo(plans, reproPersistPath_);
}

// Distinct aggregation must run single-threaded or data must be partitioned
// on group-by keys among threads.
return compareEquivalentPlanResults(
plans, customVerification, input, customVerifiers, 1, false);
}

} // namespace
} // namespace facebook::velox::exec::test
16 changes: 13 additions & 3 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ void writeToFile(
}
} // namespace

// Sometimes we generate zero-column input of type ROW({}) or a column of type
// UNKNOWN(). Such data cannot be written to a file and therefore cannot
// be tested with TableScan.
bool isTableScanSupported(const TypePtr& type) {
if (type->kind() == TypeKind::ROW && type->size() == 0) {
return false;
Expand Down Expand Up @@ -565,11 +568,18 @@ void printStats(const AggregationFuzzerBase::FunctionsStats& stats) {
std::string makeFunctionCall(
const std::string& name,
const std::vector<std::string>& argNames,
bool sortedInputs) {
bool sortedInputs,
bool distinctInputs) {
std::ostringstream call;
call << name << "(" << folly::join(", ", argNames);
call << name << "(";

const auto args = folly::join(", ", argNames);
if (sortedInputs) {
call << " ORDER BY " << folly::join(", ", argNames);
call << args << " ORDER BY " << args;
} else if (distinctInputs) {
call << "distinct " << args;
} else {
call << args;
}
call << ")";

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ std::string printPercentageStat(size_t n, size_t total);
std::string makeFunctionCall(
const std::string& name,
const std::vector<std::string>& argNames,
bool sortedInputs);
bool sortedInputs = false,
bool distinctInputs = false);

// Returns a list of column names from c0 to cn.
std::vector<std::string> makeNames(size_t n);
Expand Down