Skip to content

Commit

Permalink
Support colllation for planner refactory test framework (#5449)
Browse files Browse the repository at this point in the history
close #5432
  • Loading branch information
xzhangxian1008 authored Jul 26, 2022
1 parent da32955 commit 679eda6
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 36 deletions.
40 changes: 21 additions & 19 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input)
return output;
}

void literalToPB(tipb::Expr * expr, const Field & value, uint32_t collator_id)
void literalToPB(tipb::Expr * expr, const Field & value, int32_t collator_id)
{
DataTypePtr type = applyVisitor(FieldToDataType(), value);
ColumnInfo ci = reverseGetColumnInfo({"", type}, 0, Field(), true);
Expand All @@ -224,7 +224,7 @@ String getFunctionNameForConstantFolding(tipb::Expr * expr)
}


void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & context)
void foldConstant(tipb::Expr * expr, int32_t collator_id, const Context & context)
{
if (expr->tp() == tipb::ScalarFunc)
{
Expand Down Expand Up @@ -282,12 +282,12 @@ void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & conte
}
}

void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context);
void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context);

void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id);
void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id);


void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t collator_id, const Context & context)
void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, int32_t collator_id, const Context & context)
{
if (auto * id = typeid_cast<ASTIdentifier *>(ast.get()))
{
Expand All @@ -307,7 +307,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co
}
}

void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context)
void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context)
{
/// aggregation function is handled in Aggregation, so just treated as a column
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) {
Expand Down Expand Up @@ -518,7 +518,7 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
foldConstant(expr, collator_id, context);
}

void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id)
void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id)
{
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) {
auto column_name = splitQualifiedName(id->getColumnName());
Expand Down Expand Up @@ -815,7 +815,7 @@ std::pair<String, String> splitQualifiedName(const String & s)

namespace mock
{
bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeSender);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -856,7 +856,7 @@ bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t col
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &)
bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &)
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -891,7 +891,7 @@ void TableScan::columnPrune(std::unordered_set<String> & used_columns)
output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }),
output_schema.end());
}
bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &)
bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &)
{
if (table_info.is_partition_table)
{
Expand All @@ -916,7 +916,7 @@ bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const M
return true;
}

bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSelection);
tipb_executor->set_executor_id(name);
Expand All @@ -938,7 +938,7 @@ void Selection::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeTopN);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -966,7 +966,7 @@ void TopN::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeLimit);
tipb_executor->set_executor_id(name);
Expand All @@ -982,7 +982,7 @@ void Limit::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeAggregation);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1127,7 +1127,7 @@ void Aggregation::toMPPSubPlan(size_t & executor_index, const DAGProperties & pr
children[0] = exchange_receiver;
}

bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeProjection);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1251,7 +1251,7 @@ void Join::fillJoinKeyAndFieldType(
const DAGSchema & schema,
tipb::Expr * tipb_key,
tipb::FieldType * tipb_field_type,
uint32_t collator_id)
int32_t collator_id)
{
auto * identifier = typeid_cast<ASTIdentifier *>(key.get());
for (size_t index = 0; index < schema.size(); index++)
Expand All @@ -1273,7 +1273,8 @@ void Join::fillJoinKeyAndFieldType(
}
}
}
bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)

bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeJoin);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1304,6 +1305,7 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id,
auto * right_child_executor = join->add_children();
return children[1]->toTiPBExecutor(right_child_executor, collator_id, mpp_info, context);
}

void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map)
{
if (properties.use_broadcast_join)
Expand Down Expand Up @@ -1360,7 +1362,7 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1437,7 +1439,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
Expand Down
26 changes: 13 additions & 13 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct Executor
{
index_++;
}
virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
= 0;
virtual void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map)
{
Expand All @@ -133,7 +133,7 @@ struct ExchangeSender : Executor
, partition_keys(partition_keys_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};

struct ExchangeReceiver : Executor
Expand All @@ -146,7 +146,7 @@ struct ExchangeReceiver : Executor
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
};

struct TableScan : public Executor
Expand All @@ -158,7 +158,7 @@ struct TableScan : public Executor
, table_info(table_info_)
{}
void columnPrune(std::unordered_set<String> & used_columns) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &) override;
void toMPPSubPlan(size_t &, const DAGProperties &, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> &) override
{}

Expand Down Expand Up @@ -190,7 +190,7 @@ struct Selection : public Executor
: Executor(index_, "selection_" + std::to_string(index_), output_schema_)
, conditions(std::move(conditions_))
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -203,7 +203,7 @@ struct TopN : public Executor
, order_columns(std::move(order_columns_))
, limit(limit_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -214,7 +214,7 @@ struct Limit : public Executor
: Executor(index_, "limit_" + std::to_string(index_), output_schema_)
, limit(limit_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -234,7 +234,7 @@ struct Aggregation : public Executor
, gby_exprs(std::move(gby_exprs_))
, is_final_mode(is_final_mode_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map) override;
};
Expand All @@ -246,7 +246,7 @@ struct Project : public Executor
: Executor(index_, "project_" + std::to_string(index_), output_schema_)
, exprs(std::move(exprs_))
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -272,9 +272,9 @@ struct Join : Executor
const DAGSchema & schema,
tipb::Expr * tipb_key,
tipb::FieldType * tipb_field_type,
uint32_t collator_id);
int32_t collator_id);

bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;

void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map) override;
};
Expand Down Expand Up @@ -309,7 +309,7 @@ struct Window : Executor
// Currently only use Window Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};

struct Sort : Executor
Expand All @@ -328,7 +328,7 @@ struct Sort : Executor
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};
} // namespace mock

Expand Down
Loading

0 comments on commit 679eda6

Please sign in to comment.