Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Simplify join executor translation #5453

Merged
merged 20 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 115 additions & 61 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -797,19 +797,20 @@ std::pair<String, String> splitQualifiedName(const String & s)
{
std::pair<String, String> ret;
Poco::StringTokenizer string_tokens(s, ".");
if (string_tokens.count() == 1)

switch (string_tokens.count())
{
case 1:
ret.second = s;
}
else if (string_tokens.count() == 2)
{
break;
case 2:
ret.first = string_tokens[0];
ret.second = string_tokens[1];
break;
default:
throw Exception("Invalid identifier name " + s);
}
else
{
throw Exception("Invalid identifier name");
}

return ret;
}

Expand Down Expand Up @@ -891,6 +892,7 @@ void TableScan::columnPrune(std::unordered_set<String> & used_columns)
output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }),
output_schema.end());
}

bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &)
{
if (table_info.is_partition_table)
Expand Down Expand Up @@ -929,6 +931,7 @@ bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_
auto * child_executor = sel->mutable_child();
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

void Selection::columnPrune(std::unordered_set<String> & used_columns)
{
for (auto & expr : conditions)
Expand Down Expand Up @@ -957,6 +960,7 @@ bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, c
auto * child_executor = topn->mutable_child();
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

void TopN::columnPrune(std::unordered_set<String> & used_columns)
{
for (auto & expr : order_columns)
Expand All @@ -975,6 +979,7 @@ bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id,
auto * child_executor = lt->mutable_child();
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

void Limit::columnPrune(std::unordered_set<String> & used_columns)
{
children[0]->columnPrune(used_columns);
Expand Down Expand Up @@ -1049,6 +1054,7 @@ bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collato
auto * child_executor = agg->mutable_child();
return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context);
}

void Aggregation::columnPrune(std::unordered_set<String> & used_columns)
{
/// output schema for partial agg is the original agg's output schema
Expand All @@ -1075,6 +1081,7 @@ void Aggregation::columnPrune(std::unordered_set<String> & used_columns)
}
children[0]->columnPrune(used_input_columns);
}

void Aggregation::toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map<String, std::pair<std::shared_ptr<ExchangeReceiver>, std::shared_ptr<ExchangeSender>>> & exchange_map)
{
if (!is_final_mode)
Expand Down Expand Up @@ -1155,6 +1162,7 @@ bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id
auto * children_executor = proj->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

void Project::columnPrune(std::unordered_set<String> & used_columns)
{
output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }),
Expand Down Expand Up @@ -1185,21 +1193,24 @@ void Join::columnPrune(std::unordered_set<String> & used_columns)
{
std::unordered_set<String> left_columns;
std::unordered_set<String> right_columns;

for (auto & field : children[0]->output_schema)
left_columns.emplace(field.first);
for (auto & field : children[1]->output_schema)
right_columns.emplace(field.first);

std::unordered_set<String> left_used_columns;
std::unordered_set<String> right_used_columns;

for (const auto & s : used_columns)
{
if (left_columns.find(s) != left_columns.end())
left_used_columns.emplace(s);
else
right_used_columns.emplace(s);
}
for (const auto & child : join_params.using_expression_list->children)

for (const auto & child : using_expr_list->children)
{
if (auto * identifier = typeid_cast<ASTIdentifier *>(child.get()))
{
Expand All @@ -1226,20 +1237,23 @@ void Join::columnPrune(std::unordered_set<String> & used_columns)
throw Exception("Only support Join on columns");
}
}

children[0]->columnPrune(left_used_columns);
children[1]->columnPrune(right_used_columns);
output_schema.clear();

/// update output schema
output_schema.clear();
for (auto & field : children[0]->output_schema)
{
if (join_params.kind == ASTTableJoin::Kind::Right && field.second.hasNotNullFlag())
if (tp == tipb::TypeRightOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}

for (auto & field : children[1]->output_schema)
{
if (join_params.kind == ASTTableJoin::Kind::Left && field.second.hasNotNullFlag())
if (tp == tipb::TypeLeftOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
Expand All @@ -1248,18 +1262,19 @@ void Join::columnPrune(std::unordered_set<String> & used_columns)

void Join::fillJoinKeyAndFieldType(
ASTPtr key,
const DAGSchema & schema,
const DAGSchema & child_schema,
tipb::Expr * tipb_key,
tipb::FieldType * tipb_field_type,
int32_t collator_id)
{
auto * identifier = typeid_cast<ASTIdentifier *>(key.get());
for (size_t index = 0; index < schema.size(); index++)
for (size_t index = 0; index < child_schema.size(); ++index)
{
const auto & field = schema[index];
if (splitQualifiedName(field.first).second == identifier->getColumnName())
const auto & [col_name, col_info] = child_schema[index];

if (splitQualifiedName(col_name).second == identifier->getColumnName())
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
auto tipb_type = TiDB::columnInfoToFieldType(col_info);
tipb_type.set_collate(collator_id);

tipb_key->set_tp(tipb::ColumnRef);
Expand All @@ -1278,28 +1293,19 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, c
{
tipb_executor->set_tp(tipb::ExecType::TypeJoin);
tipb_executor->set_executor_id(name);

tipb::Join * join = tipb_executor->mutable_join();
switch (join_params.kind) // todo support more type...
{
case ASTTableJoin::Kind::Inner:
join->set_join_type(tipb::JoinType::TypeInnerJoin);
break;
case ASTTableJoin::Kind::Left:
join->set_join_type(tipb::JoinType::TypeLeftOuterJoin);
break;
case ASTTableJoin::Kind::Right:
join->set_join_type(tipb::JoinType::TypeRightOuterJoin);
break;
default:
throw Exception("Unsupported join type");
}

join->set_join_type(tp);
join->set_join_exec_type(tipb::JoinExecType::TypeHashJoin);
join->set_inner_idx(1);
for (auto & key : join_params.using_expression_list->children)

for (auto & key : using_expr_list->children)
{
fillJoinKeyAndFieldType(key, children[0]->output_schema, join->add_left_join_keys(), join->add_probe_types(), collator_id);
fillJoinKeyAndFieldType(key, children[1]->output_schema, join->add_right_join_keys(), join->add_build_types(), collator_id);
}

auto * left_child_executor = join->add_children();
children[0]->toTiPBExecutor(left_child_executor, collator_id, mpp_info, context);
auto * right_child_executor = join->add_children();
Expand All @@ -1321,29 +1327,27 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
return;
}

std::vector<size_t> left_partition_keys;
std::vector<size_t> right_partition_keys;
for (auto & key : join_params.using_expression_list->children)
{
size_t index = 0;
for (; index < children[0]->output_schema.size(); index++)
{
if (splitQualifiedName(children[0]->output_schema[index].first).second == key->getColumnName())
{
left_partition_keys.push_back(index);
break;
}
}
index = 0;
for (; index < children[1]->output_schema.size(); index++)

auto push_back_partition_key = [](auto & partition_keys, const auto & child_schema, const auto & key) {
for (size_t index = 0; index < child_schema.size(); ++index)
{
if (splitQualifiedName(children[1]->output_schema[index].first).second == key->getColumnName())
if (splitQualifiedName(child_schema[index].first).second == key->getColumnName())
{
right_partition_keys.push_back(index);
partition_keys.push_back(index);
break;
}
}
};

for (auto & key : using_expr_list->children)
{
push_back_partition_key(left_partition_keys, children[0]->output_schema, key);
push_back_partition_key(right_partition_keys, children[1]->output_schema, key);
}

std::shared_ptr<ExchangeSender> left_exchange_sender
= std::make_shared<ExchangeSender>(executor_index, children[0]->output_schema, tipb::Hash, left_partition_keys);
left_exchange_sender->children.push_back(children[0]);
Expand Down Expand Up @@ -1648,30 +1652,80 @@ ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr se
return project;
}

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params)
static void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
DAGSchema output_schema;
const auto & join_params = (static_cast<const ASTTableJoin &>(*params));
for (auto & field : left->output_schema)
for (const auto & field : left_schema)
{
if (join_params.kind == ASTTableJoin::Kind::Right && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
if (tp == tipb::JoinType::TypeRightOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
schema.push_back(field);
}
for (auto & field : right->output_schema)
{
if (join_params.kind == ASTTableJoin::Kind::Left && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}

static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
for (const auto & field : right_schema)
{
if (tp == tipb::JoinType::TypeLeftOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}
auto join = std::make_shared<mock::Join>(executor_index, output_schema, params);
}

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list)
{
DAGSchema output_schema;

buildLeftSideJoinSchema(output_schema, left->output_schema, tp);
buildRightSideJoinSchema(output_schema, right->output_schema, tp);

auto join = std::make_shared<mock::Join>(executor_index, output_schema, tp, using_expr_list);
join->children.push_back(left);
join->children.push_back(right);

return join;
}

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params)
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
{
tipb::JoinType tp;
const auto & ast_join = (static_cast<const ASTTableJoin &>(*params));
switch (ast_join.kind)
{
case ASTTableJoin::Kind::Inner:
tp = tipb::JoinType::TypeInnerJoin;
break;
case ASTTableJoin::Kind::Left:
tp = tipb::JoinType::TypeLeftOuterJoin;
break;
case ASTTableJoin::Kind::Right:
tp = tipb::JoinType::TypeRightOuterJoin;
break;
default:
throw Exception("Unsupported join type");
}
return compileJoin(executor_index, left, right, tp, ast_join.using_expression_list);
}

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type)
{
ExecutorPtr exchange_sender = std::make_shared<mock::ExchangeSender>(executor_index, input->output_schema, exchange_type);
Expand Down
29 changes: 21 additions & 8 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
#include <Storages/Transaction/Types.h>
#include <tipb/executor.pb.h>
#include <tipb/select.pb.h>

#include <optional>
Expand Down Expand Up @@ -252,17 +253,20 @@ struct Project : public Executor

struct Join : Executor
{
ASTPtr params;
const ASTTableJoin & join_params;
Join(size_t & index_, const DAGSchema & output_schema_, ASTPtr params_)
tipb::JoinType tp;

const ASTPtr using_expr_list;
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved

// todo(ljr): support on expr
const ASTPtr on_expr{};

Join(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, ASTPtr using_expr_list_)
: Executor(index_, "Join_" + std::to_string(index_), output_schema_)
, params(params_)
, join_params(static_cast<const ASTTableJoin &>(*params))
, tp(tp_)
, using_expr_list(using_expr_list_)
{
if (join_params.using_expression_list == nullptr)
if (using_expr_list == nullptr)
throw Exception("No join condition found.");
if (join_params.strictness != ASTTableJoin::Strictness::All)
throw Exception("Only support join with strictness ALL");
}

void columnPrune(std::unordered_set<String> & used_columns) override;
Expand Down Expand Up @@ -346,8 +350,17 @@ ExecutorPtr compileAggregation(ExecutorPtr input, size_t & executor_index, ASTPt

ExecutorPtr compileProject(ExecutorPtr input, size_t & executor_index, ASTPtr select_list);

/// Note: this api is only used by legacy test framework for compatibility purpose, which will be depracated soon,
/// so please avoid using it.
/// Old executor test framework bases on ch's parser to translate sql string to ast tree, then manually to DAGRequest.
/// However, as for join executor, this translation, from ASTTableJoin to tipb::Join, is not a one-to-one mapping
/// because of the different join classification model used by these two structures. Therefore, under old test framework,
/// it is hard to fully test join executor. New framework aims to directly construct DAGRequest, so new framework APIs for join should
/// avoid using ASTTableJoin.
ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, ASTPtr params);

ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr right, tipb::JoinType tp, ASTPtr using_expr_list);

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count = 0);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ struct TiFlashJoin
/// @other_filter_column_name: column name of `and(other_cond1, other_cond2, ...)`
/// @other_eq_filter_from_in_column_name: column name of `and(other_eq_cond1_from_in, other_eq_cond2_from_in, ...)`
/// such as
/// `select * from t where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// `select * from t1 where col1 in (select col2 from t2 where t1.col2 = t2.col3)`
/// - other_filter is `t1.col2 = t2.col3`
/// - other_eq_filter_from_in_column is `t1.col1 = t2.col2`
///
Expand Down
Loading