Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support colllation for planner refactory test framework #5449

Merged
merged 10 commits into from
Jul 26, 2022
40 changes: 21 additions & 19 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input)
return output;
}

void literalToPB(tipb::Expr * expr, const Field & value, uint32_t collator_id)
void literalToPB(tipb::Expr * expr, const Field & value, int32_t collator_id)
{
DataTypePtr type = applyVisitor(FieldToDataType(), value);
ColumnInfo ci = reverseGetColumnInfo({"", type}, 0, Field(), true);
Expand All @@ -224,7 +224,7 @@ String getFunctionNameForConstantFolding(tipb::Expr * expr)
}


void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & context)
void foldConstant(tipb::Expr * expr, int32_t collator_id, const Context & context)
{
if (expr->tp() == tipb::ScalarFunc)
{
Expand Down Expand Up @@ -282,12 +282,12 @@ void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & conte
}
}

void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context);
void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context);

void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id);
void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id);


void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t collator_id, const Context & context)
void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, int32_t collator_id, const Context & context)
{
if (auto * id = typeid_cast<ASTIdentifier *>(ast.get()))
{
Expand All @@ -307,7 +307,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co
}
}

void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context)
void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context)
{
/// aggregation function is handled in Aggregation, so just treated as a column
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) {
Expand Down Expand Up @@ -518,7 +518,7 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
foldConstant(expr, collator_id, context);
}

void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id)
void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id)
{
auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) {
auto column_name = splitQualifiedName(id->getColumnName());
Expand Down Expand Up @@ -815,7 +815,7 @@ std::pair<String, String> splitQualifiedName(const String & s)

namespace mock
{
bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeSender);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -856,7 +856,7 @@ bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t col
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &)
bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &)
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -891,7 +891,7 @@ void TableScan::columnPrune(std::unordered_set<String> & used_columns)
output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }),
output_schema.end());
}
bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &)
bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &)
{
if (table_info.is_partition_table)
{
Expand All @@ -916,7 +916,7 @@ bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const M
return true;
}

bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSelection);
tipb_executor->set_executor_id(name);
Expand All @@ -938,7 +938,7 @@ void Selection::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeTopN);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -966,7 +966,7 @@ void TopN::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeLimit);
tipb_executor->set_executor_id(name);
Expand All @@ -982,7 +982,7 @@ void Limit::columnPrune(std::unordered_set<String> & used_columns)
output_schema = children[0]->output_schema;
}

bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeAggregation);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1127,7 +1127,7 @@ void Aggregation::toMPPSubPlan(size_t & executor_index, const DAGProperties & pr
children[0] = exchange_receiver;
}

bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeProjection);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1251,7 +1251,7 @@ void Join::fillJoinKeyAndFieldType(
const DAGSchema & schema,
tipb::Expr * tipb_key,
tipb::FieldType * tipb_field_type,
uint32_t collator_id)
int32_t collator_id)
{
auto * identifier = typeid_cast<ASTIdentifier *>(key.get());
for (size_t index = 0; index < schema.size(); index++)
Expand All @@ -1273,7 +1273,8 @@ void Join::fillJoinKeyAndFieldType(
}
}
}
bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)

bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeJoin);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1304,6 +1305,7 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id,
auto * right_child_executor = join->add_children();
return children[1]->toTiPBExecutor(right_child_executor, collator_id, mpp_info, context);
}

void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map)
{
if (properties.use_broadcast_join)
Expand Down Expand Up @@ -1360,7 +1362,7 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
Expand Down Expand Up @@ -1437,7 +1439,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
Expand Down
26 changes: 13 additions & 13 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct Executor
{
index_++;
}
virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context)
virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
= 0;
virtual void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map)
{
Expand All @@ -133,7 +133,7 @@ struct ExchangeSender : Executor
, partition_keys(partition_keys_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};

struct ExchangeReceiver : Executor
Expand All @@ -146,7 +146,7 @@ struct ExchangeReceiver : Executor
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
};

struct TableScan : public Executor
Expand All @@ -158,7 +158,7 @@ struct TableScan : public Executor
, table_info(table_info_)
{}
void columnPrune(std::unordered_set<String> & used_columns) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &) override;
void toMPPSubPlan(size_t &, const DAGProperties &, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> &) override
{}

Expand Down Expand Up @@ -190,7 +190,7 @@ struct Selection : public Executor
: Executor(index_, "selection_" + std::to_string(index_), output_schema_)
, conditions(std::move(conditions_))
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -203,7 +203,7 @@ struct TopN : public Executor
, order_columns(std::move(order_columns_))
, limit(limit_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -214,7 +214,7 @@ struct Limit : public Executor
: Executor(index_, "limit_" + std::to_string(index_), output_schema_)
, limit(limit_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -234,7 +234,7 @@ struct Aggregation : public Executor
, gby_exprs(std::move(gby_exprs_))
, is_final_mode(is_final_mode_)
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map) override;
};
Expand All @@ -246,7 +246,7 @@ struct Project : public Executor
: Executor(index_, "project_" + std::to_string(index_), output_schema_)
, exprs(std::move(exprs_))
{}
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
void columnPrune(std::unordered_set<String> & used_columns) override;
};

Expand All @@ -272,9 +272,9 @@ struct Join : Executor
const DAGSchema & schema,
tipb::Expr * tipb_key,
tipb::FieldType * tipb_field_type,
uint32_t collator_id);
int32_t collator_id);

bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;

void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map) override;
};
Expand Down Expand Up @@ -309,7 +309,7 @@ struct Window : Executor
// Currently only use Window Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};

struct Sort : Executor
Expand All @@ -328,7 +328,7 @@ struct Sort : Executor
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
// TODO: call columnPrune in unit test and further benchmark test to eliminate compute process.
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
};
} // namespace mock

Expand Down
Loading