Skip to content

Commit

Permalink
add PrestoQueryRunner/DuckDbQueryRunner supports for join fuzzer
Browse files Browse the repository at this point in the history
  • Loading branch information
yanngyoung committed Jun 7, 2024
1 parent b3f53f6 commit b177132
Show file tree
Hide file tree
Showing 15 changed files with 567 additions and 216 deletions.
6 changes: 5 additions & 1 deletion velox/docs/develop/testing/join-fuzzer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ combined with randomly generated payload. When generating the join plan node,
fuzzer shuffles join keys and output columns and randomly drops some columns
from the output.

The fuzzer runs the query plan and compares the results with DuckDB.
The fuzzer runs the query plan and compares the results with the reference (DuckDB or Presto) as the expected result.

The fuzzer then generates a set of different but logically equivalent plans,
runs them and verifies that results are the same. Each plan runs twice: with
Expand Down Expand Up @@ -65,4 +65,8 @@ Here is a full list of supported command line arguments.

* ``--enable_spill``: Whether to test with spilling or not. Default is true.

* ``--arbitrator_capacity``: Arbitrator capacity in bytes. Default is 6L << 30.

* ``--allocator_capacity``: Allocator capacity in bytes. Default is 8L << 30.

If running from CLion IDE, add ``--logtostderr=1`` to see the full output.
4 changes: 4 additions & 0 deletions velox/docs/develop/testing/row-number-fuzzer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ Here is a full list of supported command line arguments.

* ``--req_timeout_ms`` Timeout in milliseconds of an HTTP request to the PrestoQueryRunner.

* ``--arbitrator_capacity``: Arbitrator capacity in bytes. Default is 6L << 30.

* ``--allocator_capacity``: Allocator capacity in bytes. Default is 8L << 30.

If running from CLion IDE, add ``--logtostderr=1`` to see the full output.
140 changes: 140 additions & 0 deletions velox/exec/fuzzer/DuckQueryRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ std::multiset<std::vector<velox::variant>> DuckQueryRunner::execute(
return queryRunner.execute(sql, resultType);
}

std::multiset<std::vector<velox::variant>> DuckQueryRunner::execute(
const std::string& sql,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const RowTypePtr& resultType) {
DuckDbQueryRunner queryRunner;
queryRunner.createTable("t", probeInput);
queryRunner.createTable("u", buildInput);
return queryRunner.execute(sql, resultType);
}

std::optional<std::string> DuckQueryRunner::toSql(
const core::PlanNodePtr& plan) {
if (!isSupported(plan->outputType())) {
Expand Down Expand Up @@ -153,6 +164,16 @@ std::optional<std::string> DuckQueryRunner::toSql(
return toSql(rowNumberNode);
}

if (const auto joinNode =
std::dynamic_pointer_cast<const core::HashJoinNode>(plan)) {
return toSql(joinNode);
}

if (const auto joinNode =
std::dynamic_pointer_cast<const core::NestedLoopJoinNode>(plan)) {
return toSql(joinNode);
}

VELOX_NYI();
}

Expand Down Expand Up @@ -329,4 +350,123 @@ std::optional<std::string> DuckQueryRunner::toSql(

return sql.str();
}

std::optional<std::string> DuckQueryRunner::toSql(
const std::shared_ptr<const core::HashJoinNode>& joinNode) {
const auto& joinKeysToSql = [](auto keys) {
std::stringstream out;
for (auto i = 0; i < keys.size(); ++i) {
if (i > 0) {
out << ", ";
}
out << keys[i]->name();
}
return out.str();
};

const auto& equiClausesToSql = [](auto joinNode) {
std::stringstream out;
for (auto i = 0; i < joinNode->leftKeys().size(); ++i) {
if (i > 0) {
out << " AND ";
}
out << joinNode->leftKeys()[i]->name() << " = "
<< joinNode->rightKeys()[i]->name();
}
return out.str();
};

const auto& outputNames = joinNode->outputType()->names();

std::stringstream sql;
if (joinNode->isLeftSemiProjectJoin()) {
sql << "SELECT "
<< folly::join(", ", outputNames.begin(), --outputNames.end());
} else {
sql << "SELECT " << folly::join(", ", outputNames);
}

switch (joinNode->joinType()) {
case core::JoinType::kInner:
sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode);
break;
case core::JoinType::kLeft:
sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode);
break;
case core::JoinType::kFull:
sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode);
break;
case core::JoinType::kLeftSemiFilter:
if (joinNode->leftKeys().size() > 1) {
return std::nullopt;
}
sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys())
<< " IN (SELECT " << joinKeysToSql(joinNode->rightKeys())
<< " FROM u)";
break;
case core::JoinType::kLeftSemiProject:
if (joinNode->isNullAware()) {
sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT "
<< joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t";
} else {
sql << ", EXISTS (SELECT * FROM u WHERE " << equiClausesToSql(joinNode)
<< ") FROM t";
}
break;
case core::JoinType::kAnti:
if (joinNode->isNullAware()) {
sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys())
<< " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys())
<< " FROM u)";
} else {
sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE "
<< equiClausesToSql(joinNode) << ")";
}
break;
default:
VELOX_UNREACHABLE(
"Unknown join type: {}", static_cast<int>(joinNode->joinType()));
}

return sql.str();
}

std::optional<std::string> DuckQueryRunner::toSql(
const std::shared_ptr<const core::NestedLoopJoinNode>& joinNode) {
const auto& joinKeysToSql = [](auto keys) {
std::stringstream out;
for (auto i = 0; i < keys.size(); ++i) {
if (i > 0) {
out << ", ";
}
out << keys[i]->name();
}
return out.str();
};

const auto& outputNames = joinNode->outputType()->names();
std::stringstream sql;

// Nested loop join without filter.
VELOX_CHECK(
joinNode->joinCondition() == nullptr,
"This code path should be called only for nested loop join without filter");
const std::string joinCondition{"(1 = 1)"};
switch (joinNode->joinType()) {
case core::JoinType::kInner:
sql << " FROM t INNER JOIN u ON " << joinCondition;
break;
case core::JoinType::kLeft:
sql << " FROM t LEFT JOIN u ON " << joinCondition;
break;
case core::JoinType::kFull:
sql << " FROM t FULL OUTER JOIN u ON " << joinCondition;
break;
default:
VELOX_UNREACHABLE(
"Unknown join type: {}", static_cast<int>(joinNode->joinType()));
}

return sql.str();
}
} // namespace facebook::velox::exec::test
12 changes: 12 additions & 0 deletions velox/exec/fuzzer/DuckQueryRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class DuckQueryRunner : public ReferenceQueryRunner {
const std::vector<RowVectorPtr>& input,
const RowTypePtr& resultType) override;

std::multiset<std::vector<velox::variant>> execute(
const std::string& sql,
const std::vector<RowVectorPtr>& probeInput,
const std::vector<RowVectorPtr>& buildInput,
const RowTypePtr& resultType) override;

private:
std::optional<std::string> toSql(
const std::shared_ptr<const core::AggregationNode>& aggregationNode);
Expand All @@ -52,6 +58,12 @@ class DuckQueryRunner : public ReferenceQueryRunner {
std::optional<std::string> toSql(
const std::shared_ptr<const core::RowNumberNode>& rowNumberNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::HashJoinNode>& joinNode);

std::optional<std::string> toSql(
const std::shared_ptr<const core::NestedLoopJoinNode>& joinNode);

std::unordered_set<std::string> aggregateFunctionNames_;
};

Expand Down
14 changes: 14 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/fuzzer/FuzzerUtil.h"
#include <filesystem>
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
Expand Down Expand Up @@ -146,4 +147,17 @@ bool containsUnsupportedTypes(const TypePtr& type) {
containsTypeKind(type, TypeKind::VARBINARY) ||
containsType(type, INTERVAL_DAY_TIME());
}

void setupMemory(int64_t allocatorCapacity, int64_t arbitratorCapacity) {
FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true;
FLAGS_velox_memory_leak_check_enabled = true;
facebook::velox::memory::SharedArbitrator::registerFactory();
facebook::velox::memory::MemoryManagerOptions options;
options.allocatorCapacity = allocatorCapacity;
options.arbitratorCapacity = arbitratorCapacity;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
options.arbitrationStateCheckCb = memoryArbitrationStateCheck;
facebook::velox::memory::MemoryManager::initialize(options);
}
} // namespace facebook::velox::exec::test
3 changes: 3 additions & 0 deletions velox/exec/fuzzer/FuzzerUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ RowTypePtr concat(const RowTypePtr& a, const RowTypePtr& b);
///
/// TODO Investigate mismatches reported when comparing Varbinary.
bool containsUnsupportedTypes(const TypePtr& type);

// Invoked to set up memory system with arbitration.
void setupMemory(int64_t allocatorCapacity, int64_t arbitratorCapacity);
} // namespace facebook::velox::exec::test
Loading

0 comments on commit b177132

Please sign in to comment.