From 46d246531bcf79f8fb8e1710b57bb76ea6509f5e Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 22 Jul 2022 12:59:40 +0800 Subject: [PATCH 1/7] submit --- dbms/src/Flash/tests/gtest_collation.cpp | 97 ++++++++++++++++++++++++ dbms/src/TestUtils/mockExecutor.cpp | 4 +- dbms/src/TestUtils/mockExecutor.h | 15 +++- 3 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 dbms/src/Flash/tests/gtest_collation.cpp diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp new file mode 100644 index 00000000000..52548285af4 --- /dev/null +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -0,0 +1,97 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace tests +{ + +class ExecutorCollation : public DB::tests::ExecutorTest +{ +public: + using ColStringNullableType = std::optional::FieldType>; + using ColUInt64Type = typename TypeTraits::FieldType; + + using ColumnWithNullableString = std::vector; + using ColumnWithUInt64 = std::vector; + + void initializeContext() override + { + ExecutorTest::initializeContext(); + + context.addMockTable({db_name, table_name}, + {{col_name, TiDB::TP::TypeString}}, + {toNullableVec(col_name, col)}); + + context.addMockTable({db_name, chinese_table}, + {{chinese_col_name, TiDB::TP::TypeString}}, + {toNullableVec(chinese_col_name, chinese_col)}); + } + + /// Prepare some names + const String db_name{"test_db"}; + const String table_name{"collation_table"}; + const String col_name{"col"}; + const ColumnWithNullableString col{"china", "china", "china ", "CHINA", "cHiNa ", "usa", "usa", "usa ", "USA", "USA "}; + + const String chinese_table{"chinese"}; + const String chinese_col_name{"col"}; + const ColumnWithNullableString chinese_col{"北京", "北京 ", "北bei京", "北Bei京", "北bei京 ", "上海", "上海 ", "shanghai ", "ShangHai", "ShangHai "}; +}; + +/// Guarantee that test framework has correctly supported the collation. +TEST_F(ExecutorCollation, Verification) +try +{ + std::shared_ptr request; + { + /// Test default collation(utf8mb4_bin) + request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "CHINA", "USA", "china", "cHiNa "})}, executeStreams(request, 1)); + + request = context.scan(db_name, chinese_table).aggregation(MockAstVec{}, {col(chinese_col_name)}).project({chinese_col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"ShangHai", "北京", "北Bei京", "shanghai ", "北bei京", "上海"})}, executeStreams(request, 1)); + } + + { + /// Test utf8mb4_general_ci + context.setCollation(TiDB::ITiDBCollator::UTF8_GENERAL_CI); + request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "china"})}, executeStreams(request, 1)); + + request = context.scan(db_name, chinese_table).aggregation(MockAstVec{}, {col(chinese_col_name)}).project({chinese_col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"北京", "shanghai ", "北bei京", "上海"})}, executeStreams(request, 1)); + } + + { + /// Test utf8_bin + context.setCollation(TiDB::ITiDBCollator::UTF8_BIN); + request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"USA", "CHINA", "usa", "china", "cHiNa "})}, executeStreams(request, 1)); + } + + { + /// Test utf8_unicode_CI + context.setCollation(TiDB::ITiDBCollator::UTF8_UNICODE_CI); + request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"china", "usa"})}, executeStreams(request, 1)); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index cc8160761e6..93431c26758 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -385,7 +385,7 @@ void MockDAGRequestContext::addExchangeReceiver(const String & name, MockColumnI DAGRequestBuilder MockDAGRequestContext::scan(String db_name, String table_name) { - auto builder = DAGRequestBuilder(index).mockTable({db_name, table_name}, mock_tables[db_name + "." + table_name]); + auto builder = DAGRequestBuilder(index, collation).mockTable({db_name, table_name}, mock_tables[db_name + "." + table_name]); // If don't have related columns, user must pass input columns as argument of executeStreams in order to run Executors Tests. // If user don't want to test executors, it will be safe to run Interpreter Tests. if (mock_table_columns.find(db_name + "." + table_name) != mock_table_columns.end()) @@ -397,7 +397,7 @@ DAGRequestBuilder MockDAGRequestContext::scan(String db_name, String table_name) DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name, uint64_t fine_grained_shuffle_stream_count) { - auto builder = DAGRequestBuilder(index).exchangeReceiver(exchange_schemas[exchange_name], fine_grained_shuffle_stream_count); + auto builder = DAGRequestBuilder(index, collation).exchangeReceiver(exchange_schemas[exchange_name], fine_grained_shuffle_stream_count); receiver_source_task_ids_map[builder.getRoot()->name] = {}; // If don't have related columns, user must pass input columns as argument of executeStreams in order to run Executors Tests. // If user don't want to test executors, it will be safe to run Interpreter Tests. diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 317b2e362fb..444ba979935 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -20,6 +20,7 @@ #include #include #include +#include namespace DB::tests { @@ -52,9 +53,10 @@ class DAGRequestBuilder return executor_index; } - explicit DAGRequestBuilder(size_t & index) + explicit DAGRequestBuilder(size_t & index, Int32 collator = TiDB::ITiDBCollator::UTF8MB4_BIN) : executor_index(index) { + properties.collator = -abs(collator); } ExecutorPtr getRoot() @@ -101,6 +103,9 @@ class DAGRequestBuilder DAGRequestBuilder & sort(MockOrderByItem order_by, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); DAGRequestBuilder & sort(MockOrderByItemVec order_by_vec, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); + void setCollation(Int32 collator_) { properties.collator = -abs(collator_); } + Int32 getCollation() const { return abs(properties.collator); } + private: void initDAGRequest(tipb::DAGRequest & dag_request); DAGRequestBuilder & buildAggregation(ASTPtr agg_funcs, ASTPtr group_by_exprs); @@ -117,8 +122,8 @@ class DAGRequestBuilder class MockDAGRequestContext { public: - explicit MockDAGRequestContext(Context context_) - : context(context_) + explicit MockDAGRequestContext(Context context_, Int32 collation_ = TiDB::ITiDBCollator::UTF8MB4_BIN) + : context(context_), collation(-abs(collation_)) { index = 0; } @@ -143,6 +148,9 @@ class MockDAGRequestContext DAGRequestBuilder scan(String db_name, String table_name); DAGRequestBuilder receive(String exchange_name, uint64_t fine_grained_shuffle_stream_count = 0); + void setCollation(Int32 collation_) { collation = -abs(collation_); } + Int32 getCollation() const { return abs(collation); } + private: size_t index; std::unordered_map mock_tables; @@ -157,6 +165,7 @@ class MockDAGRequestContext // In TiFlash, we use task_id to identify an Mpp Task. std::unordered_map> receiver_source_task_ids_map; Context context; + Int32 collation; }; ASTPtr buildColumn(const String & column_name); From 7e36a551d417e94827246233e5e2f889fa1adf02 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 22 Jul 2022 13:17:34 +0800 Subject: [PATCH 2/7] foramt --- dbms/src/Flash/tests/gtest_collation.cpp | 2 +- dbms/src/TestUtils/mockExecutor.h | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index 52548285af4..d10e14687a1 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -62,7 +62,7 @@ try /// Test default collation(utf8mb4_bin) request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "CHINA", "USA", "china", "cHiNa "})}, executeStreams(request, 1)); - + request = context.scan(db_name, chinese_table).aggregation(MockAstVec{}, {col(chinese_col_name)}).project({chinese_col_name}).build(context); ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"ShangHai", "北京", "北Bei京", "shanghai ", "北bei京", "上海"})}, executeStreams(request, 1)); } diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 444ba979935..0ef1e4d2ad6 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -19,8 +19,8 @@ #include #include #include -#include #include +#include namespace DB::tests { @@ -123,7 +123,8 @@ class MockDAGRequestContext { public: explicit MockDAGRequestContext(Context context_, Int32 collation_ = TiDB::ITiDBCollator::UTF8MB4_BIN) - : context(context_), collation(-abs(collation_)) + : context(context_) + , collation(-abs(collation_)) { index = 0; } @@ -150,7 +151,7 @@ class MockDAGRequestContext void setCollation(Int32 collation_) { collation = -abs(collation_); } Int32 getCollation() const { return abs(collation); } - + private: size_t index; std::unordered_map mock_tables; From 4c7ce386b1f2b21b2bc86cabcb5ae9f9edf84237 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 22 Jul 2022 16:17:28 +0800 Subject: [PATCH 3/7] simplify --- dbms/src/Flash/tests/gtest_collation.cpp | 47 +++++++++--------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index d10e14687a1..6cb748f2b46 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -42,6 +42,13 @@ class ExecutorCollation : public DB::tests::ExecutorTest {toNullableVec(chinese_col_name, chinese_col)}); } + void setAndCheck(const String & table_name, const String & col_name, Int32 collation, const ColumnsWithTypeAndName & expect) + { + context.setCollation(collation); + auto request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + ASSERT_COLUMNS_EQ_UR(expect, executeStreams(request, 1)); + } + /// Prepare some names const String db_name{"test_db"}; const String table_name{"collation_table"}; @@ -57,39 +64,19 @@ class ExecutorCollation : public DB::tests::ExecutorTest TEST_F(ExecutorCollation, Verification) try { - std::shared_ptr request; - { - /// Test default collation(utf8mb4_bin) - request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "CHINA", "USA", "china", "cHiNa "})}, executeStreams(request, 1)); + /// Test utf8mb4_bin + setAndCheck(table_name, col_name, TiDB::ITiDBCollator::UTF8MB4_BIN, ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "CHINA", "USA", "china", "cHiNa "})}); + setAndCheck(chinese_table, chinese_col_name, TiDB::ITiDBCollator::UTF8MB4_BIN, ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"ShangHai", "北京", "北Bei京", "shanghai ", "北bei京", "上海"})}); - request = context.scan(db_name, chinese_table).aggregation(MockAstVec{}, {col(chinese_col_name)}).project({chinese_col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"ShangHai", "北京", "北Bei京", "shanghai ", "北bei京", "上海"})}, executeStreams(request, 1)); - } + /// Test utf8mb4_general_ci + setAndCheck(table_name, col_name, TiDB::ITiDBCollator::UTF8_GENERAL_CI, ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "china"})}); + setAndCheck(chinese_table, chinese_col_name, TiDB::ITiDBCollator::UTF8_GENERAL_CI, ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"北京", "shanghai ", "北bei京", "上海"})}); - { - /// Test utf8mb4_general_ci - context.setCollation(TiDB::ITiDBCollator::UTF8_GENERAL_CI); - request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"usa", "china"})}, executeStreams(request, 1)); + /// Test utf8_bin + setAndCheck(table_name, col_name, TiDB::ITiDBCollator::UTF8_BIN, ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"USA", "CHINA", "usa", "china", "cHiNa "})}); - request = context.scan(db_name, chinese_table).aggregation(MockAstVec{}, {col(chinese_col_name)}).project({chinese_col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(chinese_col_name, ColumnWithNullableString{"北京", "shanghai ", "北bei京", "上海"})}, executeStreams(request, 1)); - } - - { - /// Test utf8_bin - context.setCollation(TiDB::ITiDBCollator::UTF8_BIN); - request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"USA", "CHINA", "usa", "china", "cHiNa "})}, executeStreams(request, 1)); - } - - { - /// Test utf8_unicode_CI - context.setCollation(TiDB::ITiDBCollator::UTF8_UNICODE_CI); - request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); - ASSERT_COLUMNS_EQ_UR(ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"china", "usa"})}, executeStreams(request, 1)); - } + /// Test utf8_unicode_CI + setAndCheck(table_name, col_name, TiDB::ITiDBCollator::UTF8_UNICODE_CI, ColumnsWithTypeAndName{toNullableVec(col_name, ColumnWithNullableString{"china", "usa"})}); } CATCH From 7cbdff72ab641619f058742e67fccb9d8e6789e0 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Jul 2022 15:47:23 +0800 Subject: [PATCH 4/7] update --- dbms/src/Debug/astToExecutor.cpp | 40 +-- dbms/src/Debug/astToExecutor.h | 26 +- dbms/src/Flash/tests/gtest_collation.cpp | 351 +++++++++++++++++++++++ 3 files changed, 385 insertions(+), 32 deletions(-) diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 61f4474f919..beddd18c37b 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -204,7 +204,7 @@ DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input) return output; } -void literalToPB(tipb::Expr * expr, const Field & value, uint32_t collator_id) +void literalToPB(tipb::Expr * expr, const Field & value, int32_t collator_id) { DataTypePtr type = applyVisitor(FieldToDataType(), value); ColumnInfo ci = reverseGetColumnInfo({"", type}, 0, Field(), true); @@ -224,7 +224,7 @@ String getFunctionNameForConstantFolding(tipb::Expr * expr) } -void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & context) +void foldConstant(tipb::Expr * expr, int32_t collator_id, const Context & context) { if (expr->tp() == tipb::ScalarFunc) { @@ -282,12 +282,12 @@ void foldConstant(tipb::Expr * expr, uint32_t collator_id, const Context & conte } } -void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context); +void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context); -void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id); +void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id); -void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t collator_id, const Context & context) +void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, int32_t collator_id, const Context & context) { if (auto * id = typeid_cast(ast.get())) { @@ -307,7 +307,7 @@ void astToPB(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, uint32_t co } } -void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, uint32_t collator_id, const Context & context) +void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr, int32_t collator_id, const Context & context) { /// aggregation function is handled in Aggregation, so just treated as a column auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { @@ -518,7 +518,7 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr foldConstant(expr, collator_id, context); } -void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, uint32_t collator_id) +void identifierToPB(const DAGSchema & input, ASTIdentifier * id, tipb::Expr * expr, int32_t collator_id) { auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { auto column_name = splitQualifiedName(id->getColumnName()); @@ -815,7 +815,7 @@ std::pair splitQualifiedName(const String & s) namespace mock { -bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeExchangeSender); tipb_executor->set_executor_id(name); @@ -856,7 +856,7 @@ bool ExchangeSender::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t col return children[0]->toTiPBExecutor(child_executor, collator_id, mpp_info, context); } -bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) +bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &) { tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver); tipb_executor->set_executor_id(name); @@ -891,7 +891,7 @@ void TableScan::columnPrune(std::unordered_set & used_columns) output_schema.erase(std::remove_if(output_schema.begin(), output_schema.end(), [&](const auto & field) { return used_columns.count(field.first) == 0; }), output_schema.end()); } -bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) +bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &) { if (table_info.is_partition_table) { @@ -916,7 +916,7 @@ bool TableScan::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const M return true; } -bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Selection::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeSelection); tipb_executor->set_executor_id(name); @@ -938,7 +938,7 @@ void Selection::columnPrune(std::unordered_set & used_columns) output_schema = children[0]->output_schema; } -bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool TopN::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeTopN); tipb_executor->set_executor_id(name); @@ -966,7 +966,7 @@ void TopN::columnPrune(std::unordered_set & used_columns) output_schema = children[0]->output_schema; } -bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Limit::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeLimit); tipb_executor->set_executor_id(name); @@ -982,7 +982,7 @@ void Limit::columnPrune(std::unordered_set & used_columns) output_schema = children[0]->output_schema; } -bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Aggregation::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeAggregation); tipb_executor->set_executor_id(name); @@ -1127,7 +1127,7 @@ void Aggregation::toMPPSubPlan(size_t & executor_index, const DAGProperties & pr children[0] = exchange_receiver; } -bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Project::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeProjection); tipb_executor->set_executor_id(name); @@ -1251,7 +1251,7 @@ void Join::fillJoinKeyAndFieldType( const DAGSchema & schema, tipb::Expr * tipb_key, tipb::FieldType * tipb_field_type, - uint32_t collator_id) + int32_t collator_id) { auto * identifier = typeid_cast(key.get()); for (size_t index = 0; index < schema.size(); index++) @@ -1273,7 +1273,8 @@ void Join::fillJoinKeyAndFieldType( } } } -bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) + +bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeJoin); tipb_executor->set_executor_id(name); @@ -1304,6 +1305,7 @@ bool Join::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, auto * right_child_executor = join->add_children(); return children[1]->toTiPBExecutor(right_child_executor, collator_id, mpp_info, context); } + void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) { if (properties.use_broadcast_join) @@ -1360,7 +1362,7 @@ void Join::toMPPSubPlan(size_t & executor_index, const DAGProperties & propertie exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender); } -bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeWindow); tipb_executor->set_executor_id(name); @@ -1437,7 +1439,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context); } -bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) +bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) { tipb_executor->set_tp(tipb::ExecType::TypeSort); tipb_executor->set_executor_id(name); diff --git a/dbms/src/Debug/astToExecutor.h b/dbms/src/Debug/astToExecutor.h index f39f4059d26..ecda26fa6c9 100644 --- a/dbms/src/Debug/astToExecutor.h +++ b/dbms/src/Debug/astToExecutor.h @@ -113,7 +113,7 @@ struct Executor { index_++; } - virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) + virtual bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) = 0; virtual void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) { @@ -133,7 +133,7 @@ struct ExchangeSender : Executor , partition_keys(partition_keys_) {} void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; }; struct ExchangeReceiver : Executor @@ -146,7 +146,7 @@ struct ExchangeReceiver : Executor , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) {} void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context &) override; }; struct TableScan : public Executor @@ -158,7 +158,7 @@ struct TableScan : public Executor , table_info(table_info_) {} void columnPrune(std::unordered_set & used_columns) override; - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t, const MPPInfo &, const Context &) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t, const MPPInfo &, const Context &) override; void toMPPSubPlan(size_t &, const DAGProperties &, std::unordered_map, std::shared_ptr>> &) override {} @@ -190,7 +190,7 @@ struct Selection : public Executor : Executor(index_, "selection_" + std::to_string(index_), output_schema_) , conditions(std::move(conditions_)) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void columnPrune(std::unordered_set & used_columns) override; }; @@ -203,7 +203,7 @@ struct TopN : public Executor , order_columns(std::move(order_columns_)) , limit(limit_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void columnPrune(std::unordered_set & used_columns) override; }; @@ -214,7 +214,7 @@ struct Limit : public Executor : Executor(index_, "limit_" + std::to_string(index_), output_schema_) , limit(limit_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void columnPrune(std::unordered_set & used_columns) override; }; @@ -234,7 +234,7 @@ struct Aggregation : public Executor , gby_exprs(std::move(gby_exprs_)) , is_final_mode(is_final_mode_) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void columnPrune(std::unordered_set & used_columns) override; void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) override; }; @@ -246,7 +246,7 @@ struct Project : public Executor : Executor(index_, "project_" + std::to_string(index_), output_schema_) , exprs(std::move(exprs_)) {} - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void columnPrune(std::unordered_set & used_columns) override; }; @@ -272,9 +272,9 @@ struct Join : Executor const DAGSchema & schema, tipb::Expr * tipb_key, tipb::FieldType * tipb_field_type, - uint32_t collator_id); + int32_t collator_id); - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; void toMPPSubPlan(size_t & executor_index, const DAGProperties & properties, std::unordered_map, std::shared_ptr>> & exchange_map) override; }; @@ -309,7 +309,7 @@ struct Window : Executor // Currently only use Window Executor in Unit Test which don't call columnPrume. // TODO: call columnPrune in unit test and further benchmark test to eliminate compute process. void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; }; struct Sort : Executor @@ -328,7 +328,7 @@ struct Sort : Executor // Currently only use Sort Executor in Unit Test which don't call columnPrume. // TODO: call columnPrune in unit test and further benchmark test to eliminate compute process. void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } - bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; + bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override; }; } // namespace mock diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index 6cb748f2b46..4cadce18300 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -14,6 +14,9 @@ #include #include +#include +#include +#include namespace DB { @@ -24,9 +27,11 @@ class ExecutorCollation : public DB::tests::ExecutorTest { public: using ColStringNullableType = std::optional::FieldType>; + using ColStringType = std::optional::FieldType>; using ColUInt64Type = typename TypeTraits::FieldType; using ColumnWithNullableString = std::vector; + using ColumnWithString = std::vector; using ColumnWithUInt64 = std::vector; void initializeContext() override @@ -40,15 +45,44 @@ class ExecutorCollation : public DB::tests::ExecutorTest context.addMockTable({db_name, chinese_table}, {{chinese_col_name, TiDB::TP::TypeString}}, {toNullableVec(chinese_col_name, chinese_col)}); + + context.addMockTable(join_table, "t1", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec("a", {1, 1, 3, 4}), toVec("b", {1, 1, 4, 1})}); + + context.addMockTable(join_table, "t2", {{"a", TiDB::TP::TypeLong}, {"b", TiDB::TP::TypeLong}}, {toVec("a", {1, 4, 2}), toVec("b", {2, 6, 2})}); + + /// For topn + context.addMockTable({db_name, topn_table}, + {{topn_col, TiDB::TP::TypeString}}, + {toNullableVec("_col", ColumnWithString{"col0-0", "col0-1", "col0-2", {}, "col0-4", {}, "col0-6", "col0-7"})}); + + /// For projection + context.addMockTable({db_name, proj_table}, + {{proj_col[0], TiDB::TP::TypeString}, + {proj_col[1], TiDB::TP::TypeString}}, + {toNullableVec(proj_col[0], ColumnWithString{"col0-0", "col0-1", "", "col0-2", {}, "col0-3", ""}), + toNullableVec(proj_col[1], ColumnWithString{"", "col1-1", "", "col1-0", {}, "col1-3", "col1-2"})}); + + /// For limit + context.addMockTable({db_name, limit_table}, + {{limit_col, TiDB::TP::TypeString}}, + {toNullableVec(limit_col, ColumnWithString{"col0-0", {}, "col0-2", "col0-3", {}, "col0-5", "col0-6", "col0-7"})}); + + /// For ExchangeSender + context.addExchangeRelationSchema(sender_name, {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}}); } void setAndCheck(const String & table_name, const String & col_name, Int32 collation, const ColumnsWithTypeAndName & expect) { context.setCollation(collation); auto request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); + std::cout << request->DebugString() << std::endl; ASSERT_COLUMNS_EQ_UR(expect, executeStreams(request, 1)); } + void checkExecutorCollation(std::shared_ptr dag_request) const; + void checkScalarFunctionCollation(std::shared_ptr dag_request) const; + void addExpr(std::queue & exprs, const tipb::Expr * const expr) const; + /// Prepare some names const String db_name{"test_db"}; const String table_name{"collation_table"}; @@ -58,8 +92,288 @@ class ExecutorCollation : public DB::tests::ExecutorTest const String chinese_table{"chinese"}; const String chinese_col_name{"col"}; const ColumnWithNullableString chinese_col{"北京", "北京 ", "北bei京", "北Bei京", "北bei京 ", "上海", "上海 ", "shanghai ", "ShangHai", "ShangHai "}; + + const String join_table{"join_table"}; + const String topn_table{"topn_table"}; + const String topn_col{"topn_col"}; + const String proj_table{"proj_table"}; + const std::vector proj_col{"proj_col0", "proj_col1"}; + const String limit_table{"limit_table"}; + const String limit_col{"limit_col"}; + const String sender_name{"sender"}; + + /// scalar functions whose collation must be set(Some more scalar functions may be added in the future) + std::set scalar_func_need_collation{tipb::ScalarFuncSig::EQInt, tipb::ScalarFuncSig::NEInt, tipb::ScalarFuncSig::GTInt, tipb::ScalarFuncSig::LTInt}; }; +/// Collect scalar functions +void ExecutorCollation::addExpr(std::queue & exprs, const tipb::Expr * const expr) const +{ + if (expr->tp() == tipb::ExprType::ScalarFunc) /// only add scalar function + exprs.push(expr); + int children_size = expr->children_size(); + + /// recursively add expression + for (int i = 0; i < children_size; ++i) + addExpr(exprs, &(expr->children(i))); +} + +void ExecutorCollation::checkExecutorCollation(std::shared_ptr dag_request) const +{ + std::queue executors; + tipb::Executor * executor = dag_request->mutable_root_executor(); + executors.push(executor); + + while (!executors.empty()) + { + tipb::Executor * executor = executors.back(); + executors.pop(); + tipb::ExecType type = executor->tp(); + + switch (type) + { + case tipb::ExecType::TypeJoin: /// need collation + { + tipb::Join * join = executor->mutable_join(); + int probe_type_size = join->probe_types_size(); + int build_type_size = join->build_types_size(); + + for (int i = 0; i < probe_type_size; ++i) + { + const tipb::FieldType & probe_type = join->probe_types(i); + ASSERT_NE(probe_type.collate(), 0); /// Check collation + } + + for (int i = 0; i < build_type_size; ++i) + { + const tipb::FieldType & build_type = join->build_types(i); + ASSERT_NE(build_type.collate(), 0); /// /// Check collation + } + + /// Push child executors into queue + int children_size = join->children_size(); + for (int i = 0; i < children_size; ++i) + executors.push(join->mutable_children(i)); + break; + } + case tipb::ExecType::TypeExchangeReceiver: /// need collation + { + tipb::ExchangeReceiver * exchange_receiver = executor->mutable_exchange_receiver(); + int field_types_size = exchange_receiver->field_types_size(); + + for (int i = 0; i < field_types_size; ++i) + { + const tipb::FieldType & field_type = exchange_receiver->field_types(i); + ASSERT_NE(field_type.collate(), 0); /// Check collation + } + break; + } + case tipb::ExecType::TypeExchangeSender: /// need collation + { + tipb::ExchangeSender * exchange_sender = executor->mutable_exchange_sender(); + int types_size = exchange_sender->types_size(); + int all_field_types_size = exchange_sender->all_field_types_size(); + + for (int i = 0; i < types_size; ++i) + { + const tipb::FieldType & field_type = exchange_sender->types(i); + ASSERT_NE(field_type.collate(), 0); /// Check collation + } + + for (int i = 0; i < all_field_types_size; ++i) + { + const tipb::FieldType & field_type = exchange_sender->all_field_types(i); + ASSERT_NE(field_type.collate(), 0); /// Check collation + } + + /// Push child executors + if (exchange_sender->has_child()) + executors.push(exchange_sender->mutable_child()); + break; + } + case tipb::ExecType::TypeSelection: + { + tipb::Selection * selection = executor->mutable_selection(); + + if (selection->has_child()) + executors.push(selection->mutable_child()); + } + case tipb::ExecType::TypeAggregation: + { + tipb::Aggregation * aggregation = executor->mutable_aggregation(); + + if (aggregation->has_child()) + executors.push(aggregation->mutable_child()); + break; + } + case tipb::ExecType::TypeTopN: + { + tipb::TopN * topn = executor->mutable_topn(); + + if (topn->has_child()) + executors.push(topn->mutable_child()); + break; + } + case tipb::ExecType::TypeLimit: + { + tipb::Limit * limit = executor->mutable_limit(); + + if (limit->has_child()) + executors.push(limit->mutable_child()); + } + case tipb::ExecType::TypeProjection: + { + tipb::Projection * projection = executor->mutable_projection(); + + if (projection->has_child()) + executors.push(projection->mutable_child()); + break; + } + case tipb::ExecType::TypeWindow: + { + tipb::Window * window = executor->mutable_window(); + + if (window->has_child()) + executors.push(window->mutable_child()); + } + case tipb::ExecType::TypeTableScan: + break; /// Do nothing + default: + { + auto exception_str = fmt::format("Unhandled executor {}", type); + throw Exception(exception_str); + } + } + } +} + +void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptr dag_request) const +{ + std::queue executors; + std::queue exprs; + tipb::Executor * executor = dag_request->mutable_root_executor(); + executors.push(executor); + + using MultiExprs = ::google::protobuf::RepeatedPtrField; + auto add_multi_exprs = [&](const MultiExprs & field, int size) { + for (int i = 0; i < size; ++i) + addExpr(exprs, &(field.Get(i))); + }; + + /// Firstly, collect scalar functions + while (!executors.empty()) + { + tipb::Executor * executor = executors.back(); + executors.pop(); + tipb::ExecType type = executor->tp(); + + switch (type) + { + case tipb::ExecType::TypeJoin: /// need collation + { + tipb::Join * join = executor->mutable_join(); + + add_multi_exprs(join->left_join_keys(), join->left_join_keys_size()); + add_multi_exprs(join->right_join_keys(), join->right_join_keys_size()); + add_multi_exprs(join->left_conditions(), join->left_conditions_size()); + add_multi_exprs(join->right_conditions(), join->right_conditions_size()); + add_multi_exprs(join->other_conditions(), join->other_conditions_size()); + add_multi_exprs(join->other_eq_conditions_from_in(), join->other_eq_conditions_from_in_size()); + + /// Push child executors into queue + int children_size = join->children_size(); + for (int i = 0; i < children_size; ++i) + executors.push(join->mutable_children(i)); + break; + } + case tipb::ExecType::TypeExchangeReceiver: /// need collation + break; /// Do nothing + case tipb::ExecType::TypeExchangeSender: /// need collation + { + tipb::ExchangeSender * exchange_sender = executor->mutable_exchange_sender(); + add_multi_exprs(exchange_sender->partition_keys(), exchange_sender->partition_keys_size()); + + /// Push child executors + if (exchange_sender->has_child()) + executors.push(exchange_sender->mutable_child()); + break; + } + case tipb::ExecType::TypeSelection: + { + tipb::Selection * selection = executor->mutable_selection(); + add_multi_exprs(selection->conditions(), selection->conditions_size()); + + if (selection->has_child()) + executors.push(selection->mutable_child()); + } + case tipb::ExecType::TypeAggregation: + { + tipb::Aggregation * aggregation = executor->mutable_aggregation(); + add_multi_exprs(aggregation->group_by(), aggregation->group_by_size()); + add_multi_exprs(aggregation->agg_func(), aggregation->agg_func_size()); + + if (aggregation->has_child()) + executors.push(aggregation->mutable_child()); + break; + } + case tipb::ExecType::TypeTopN: + { + tipb::TopN * topn = executor->mutable_topn(); + + if (topn->has_child()) + executors.push(topn->mutable_child()); + break; + } + case tipb::ExecType::TypeLimit: + { + tipb::Limit * limit = executor->mutable_limit(); + + if (limit->has_child()) + executors.push(limit->mutable_child()); + } + case tipb::ExecType::TypeProjection: + { + tipb::Projection * projection = executor->mutable_projection(); + add_multi_exprs(projection->exprs(), projection->exprs_size()); + + if (projection->has_child()) + executors.push(projection->mutable_child()); + break; + } + case tipb::ExecType::TypeWindow: + { + tipb::Window * window = executor->mutable_window(); + add_multi_exprs(window->func_desc(), window->func_desc_size()); + + if (window->has_child()) + executors.push(window->mutable_child()); + } + case tipb::ExecType::TypeTableScan: + break; /// Do nothing + default: + { + auto exception_str = fmt::format("Unhandled executor {}", type); + throw Exception(exception_str); + } + } + } + + /// Secondly, check collation of scalar functions + while (!exprs.empty()) + { + const tipb::Expr * expr = exprs.back(); + exprs.pop(); + + /// We only guarantee the collations of scalar functions that have been add into "scalar_func_need_collation" to be set + auto iter = scalar_func_need_collation.find(expr->sig()); + if (iter == scalar_func_need_collation.end()) + continue; /// Ignore this scalar function + + /// Check + ASSERT_NE(expr->field_type().collate(), 0); + } +} + /// Guarantee that test framework has correctly supported the collation. TEST_F(ExecutorCollation, Verification) try @@ -80,5 +394,42 @@ try } CATCH +/// Guarantee the collations of executors or functions have been set +TEST_F(ExecutorCollation, CheckCollation) +try +{ + { + /// Check collation for executors + auto request = context.scan(join_table, "t1") + .join(context.scan(join_table, "t2"), {col("a")}, ASTTableJoin::Kind::Inner) + .aggregation({Max(col("a")), Min(col("a")), Count(col("a"))}, {col("b")}) + .build(context); + checkExecutorCollation(request); + + request = context.scan(db_name, topn_table).topN(topn_col, true, 100).build(context); + checkExecutorCollation(request); + + request = context.scan(db_name, proj_table).project(MockAstVec{col(proj_col[0])}).build(context); + checkExecutorCollation(request); + + request = context.scan(db_name, limit_table).limit(100).build(context); + checkExecutorCollation(request); + + request = context.receive(sender_name).project({"s1", "s2", "s3"}).exchangeSender(tipb::Broadcast).build(context); + checkExecutorCollation(request); + + /// TODO test window executor + } + + { + /// Check collation for expressions + auto request = context.scan(db_name, proj_table).project(MockAstVec{eq(col(proj_col[0]), col(proj_col[0])), gt(col(proj_col[0]), col(proj_col[1]))}).build(context); + checkScalarFunctionCollation(request); + + /// TODO more scalar functions to + } +} +CATCH + } // namespace tests } // namespace DB From 46e89427c3bda7d0f4ae163f33a40825a087e0f8 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Jul 2022 15:53:23 +0800 Subject: [PATCH 5/7] tweaking --- dbms/src/Flash/tests/gtest_collation.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index 4cadce18300..7ced0982361 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -426,7 +426,7 @@ try auto request = context.scan(db_name, proj_table).project(MockAstVec{eq(col(proj_col[0]), col(proj_col[0])), gt(col(proj_col[0]), col(proj_col[1]))}).build(context); checkScalarFunctionCollation(request); - /// TODO more scalar functions to + /// TODO more scalar functions to test... } } CATCH From 5164252af9ca82088d63f11dc3939413649d4e54 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Mon, 25 Jul 2022 16:21:06 +0800 Subject: [PATCH 6/7] format --- dbms/src/Flash/tests/gtest_collation.cpp | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index 7ced0982361..f7b10a435ad 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -14,9 +14,10 @@ #include #include -#include -#include + #include +#include +#include namespace DB { @@ -123,7 +124,7 @@ void ExecutorCollation::checkExecutorCollation(std::shared_ptr std::queue executors; tipb::Executor * executor = dag_request->mutable_root_executor(); executors.push(executor); - + while (!executors.empty()) { tipb::Executor * executor = executors.back(); @@ -190,7 +191,7 @@ void ExecutorCollation::checkExecutorCollation(std::shared_ptr if (exchange_sender->has_child()) executors.push(exchange_sender->mutable_child()); break; - } + } case tipb::ExecType::TypeSelection: { tipb::Selection * selection = executor->mutable_selection(); @@ -297,7 +298,7 @@ void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptrhas_child()) executors.push(exchange_sender->mutable_child()); break; - } + } case tipb::ExecType::TypeSelection: { tipb::Selection * selection = executor->mutable_selection(); @@ -368,7 +369,7 @@ void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptrsig()); if (iter == scalar_func_need_collation.end()) continue; /// Ignore this scalar function - + /// Check ASSERT_NE(expr->field_type().collate(), 0); } @@ -401,9 +402,9 @@ try { /// Check collation for executors auto request = context.scan(join_table, "t1") - .join(context.scan(join_table, "t2"), {col("a")}, ASTTableJoin::Kind::Inner) - .aggregation({Max(col("a")), Min(col("a")), Count(col("a"))}, {col("b")}) - .build(context); + .join(context.scan(join_table, "t2"), {col("a")}, ASTTableJoin::Kind::Inner) + .aggregation({Max(col("a")), Min(col("a")), Count(col("a"))}, {col("b")}) + .build(context); checkExecutorCollation(request); request = context.scan(db_name, topn_table).topN(topn_col, true, 100).build(context); @@ -411,7 +412,7 @@ try request = context.scan(db_name, proj_table).project(MockAstVec{col(proj_col[0])}).build(context); checkExecutorCollation(request); - + request = context.scan(db_name, limit_table).limit(100).build(context); checkExecutorCollation(request); From 98807668ab7c4665aa07c6cfde26af912d563b83 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Tue, 26 Jul 2022 10:53:47 +0800 Subject: [PATCH 7/7] update --- dbms/src/Flash/tests/gtest_collation.cpp | 243 ++++++++--------------- dbms/src/TestUtils/mockExecutor.h | 9 +- 2 files changed, 90 insertions(+), 162 deletions(-) diff --git a/dbms/src/Flash/tests/gtest_collation.cpp b/dbms/src/Flash/tests/gtest_collation.cpp index f7b10a435ad..abae9782774 100644 --- a/dbms/src/Flash/tests/gtest_collation.cpp +++ b/dbms/src/Flash/tests/gtest_collation.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -24,6 +25,7 @@ namespace DB namespace tests { +/// Note: These tests are for the correctness of the test framework class ExecutorCollation : public DB::tests::ExecutorTest { public: @@ -76,11 +78,10 @@ class ExecutorCollation : public DB::tests::ExecutorTest { context.setCollation(collation); auto request = context.scan(db_name, table_name).aggregation(MockAstVec{}, {col(col_name)}).project({col_name}).build(context); - std::cout << request->DebugString() << std::endl; - ASSERT_COLUMNS_EQ_UR(expect, executeStreams(request, 1)); + ASSERT_COLUMNS_EQ_UR(expect, executeStreams(request)); } - void checkExecutorCollation(std::shared_ptr dag_request) const; + std::queue checkExecutorCollation(std::shared_ptr dag_request) const; void checkScalarFunctionCollation(std::shared_ptr dag_request) const; void addExpr(std::queue & exprs, const tipb::Expr * const expr) const; @@ -119,124 +120,74 @@ void ExecutorCollation::addExpr(std::queue & exprs, const ti addExpr(exprs, &(expr->children(i))); } -void ExecutorCollation::checkExecutorCollation(std::shared_ptr dag_request) const +std::queue ExecutorCollation::checkExecutorCollation(std::shared_ptr dag_request) const { - std::queue executors; - tipb::Executor * executor = dag_request->mutable_root_executor(); - executors.push(executor); - - while (!executors.empty()) - { - tipb::Executor * executor = executors.back(); - executors.pop(); - tipb::ExecType type = executor->tp(); + std::queue exec_collation_absent; + + auto checkExecutor = [&](const tipb::Executor & executor) -> bool { +#define CHECK(probe_type, exec_type) \ + do \ + { \ + if (probe_type.collate() == 0) \ + { \ + exec_collation_absent.push(exec_type); \ + return true; \ + } \ + } while (0); + tipb::ExecType type = executor.tp(); switch (type) { case tipb::ExecType::TypeJoin: /// need collation { - tipb::Join * join = executor->mutable_join(); - int probe_type_size = join->probe_types_size(); - int build_type_size = join->build_types_size(); + const tipb::Join & join = executor.join(); + int probe_type_size = join.probe_types_size(); + int build_type_size = join.build_types_size(); for (int i = 0; i < probe_type_size; ++i) - { - const tipb::FieldType & probe_type = join->probe_types(i); - ASSERT_NE(probe_type.collate(), 0); /// Check collation - } + CHECK(join.probe_types(i), tipb::ExecType::TypeJoin); for (int i = 0; i < build_type_size; ++i) - { - const tipb::FieldType & build_type = join->build_types(i); - ASSERT_NE(build_type.collate(), 0); /// /// Check collation - } - - /// Push child executors into queue - int children_size = join->children_size(); - for (int i = 0; i < children_size; ++i) - executors.push(join->mutable_children(i)); + CHECK(join.build_types(i), tipb::ExecType::TypeJoin); + break; } case tipb::ExecType::TypeExchangeReceiver: /// need collation { - tipb::ExchangeReceiver * exchange_receiver = executor->mutable_exchange_receiver(); - int field_types_size = exchange_receiver->field_types_size(); + const tipb::ExchangeReceiver & exchange_receiver = executor.exchange_receiver(); + int field_types_size = exchange_receiver.field_types_size(); for (int i = 0; i < field_types_size; ++i) - { - const tipb::FieldType & field_type = exchange_receiver->field_types(i); - ASSERT_NE(field_type.collate(), 0); /// Check collation - } + CHECK(exchange_receiver.field_types(i), tipb::ExecType::TypeExchangeReceiver); + break; } case tipb::ExecType::TypeExchangeSender: /// need collation { - tipb::ExchangeSender * exchange_sender = executor->mutable_exchange_sender(); - int types_size = exchange_sender->types_size(); - int all_field_types_size = exchange_sender->all_field_types_size(); + const tipb::ExchangeSender & exchange_sender = executor.exchange_sender(); + int types_size = exchange_sender.types_size(); + int all_field_types_size = exchange_sender.all_field_types_size(); for (int i = 0; i < types_size; ++i) - { - const tipb::FieldType & field_type = exchange_sender->types(i); - ASSERT_NE(field_type.collate(), 0); /// Check collation - } + CHECK(exchange_sender.types(i), tipb::ExecType::TypeExchangeSender); for (int i = 0; i < all_field_types_size; ++i) - { - const tipb::FieldType & field_type = exchange_sender->all_field_types(i); - ASSERT_NE(field_type.collate(), 0); /// Check collation - } - - /// Push child executors - if (exchange_sender->has_child()) - executors.push(exchange_sender->mutable_child()); + CHECK(exchange_sender.all_field_types(i), tipb::ExecType::TypeExchangeSender); + break; } case tipb::ExecType::TypeSelection: - { - tipb::Selection * selection = executor->mutable_selection(); - - if (selection->has_child()) - executors.push(selection->mutable_child()); - } + break; /// Do nothing case tipb::ExecType::TypeAggregation: - { - tipb::Aggregation * aggregation = executor->mutable_aggregation(); - - if (aggregation->has_child()) - executors.push(aggregation->mutable_child()); - break; - } + break; /// Do nothing case tipb::ExecType::TypeTopN: - { - tipb::TopN * topn = executor->mutable_topn(); - - if (topn->has_child()) - executors.push(topn->mutable_child()); - break; - } + break; /// Do nothing case tipb::ExecType::TypeLimit: - { - tipb::Limit * limit = executor->mutable_limit(); - - if (limit->has_child()) - executors.push(limit->mutable_child()); - } + break; /// Do nothing case tipb::ExecType::TypeProjection: - { - tipb::Projection * projection = executor->mutable_projection(); - - if (projection->has_child()) - executors.push(projection->mutable_child()); - break; - } + break; /// Do nothing case tipb::ExecType::TypeWindow: - { - tipb::Window * window = executor->mutable_window(); - - if (window->has_child()) - executors.push(window->mutable_child()); - } + break; /// Do nothing case tipb::ExecType::TypeTableScan: break; /// Do nothing default: @@ -245,15 +196,17 @@ void ExecutorCollation::checkExecutorCollation(std::shared_ptr throw Exception(exception_str); } } - } + + return true; /// Alawys traverse the executors + }; + + traverseExecutors(dag_request.get(), checkExecutor); + return exec_collation_absent; } void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptr dag_request) const { - std::queue executors; std::queue exprs; - tipb::Executor * executor = dag_request->mutable_root_executor(); - executors.push(executor); using MultiExprs = ::google::protobuf::RepeatedPtrField; auto add_multi_exprs = [&](const MultiExprs & field, int size) { @@ -261,93 +214,59 @@ void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptrtp(); + auto collectExprs = [&](const tipb::Executor & executor) -> bool { + tipb::ExecType type = executor.tp(); switch (type) { case tipb::ExecType::TypeJoin: /// need collation { - tipb::Join * join = executor->mutable_join(); - - add_multi_exprs(join->left_join_keys(), join->left_join_keys_size()); - add_multi_exprs(join->right_join_keys(), join->right_join_keys_size()); - add_multi_exprs(join->left_conditions(), join->left_conditions_size()); - add_multi_exprs(join->right_conditions(), join->right_conditions_size()); - add_multi_exprs(join->other_conditions(), join->other_conditions_size()); - add_multi_exprs(join->other_eq_conditions_from_in(), join->other_eq_conditions_from_in_size()); - - /// Push child executors into queue - int children_size = join->children_size(); - for (int i = 0; i < children_size; ++i) - executors.push(join->mutable_children(i)); + const tipb::Join & join = executor.join(); + + add_multi_exprs(join.left_join_keys(), join.left_join_keys_size()); + add_multi_exprs(join.right_join_keys(), join.right_join_keys_size()); + add_multi_exprs(join.left_conditions(), join.left_conditions_size()); + add_multi_exprs(join.right_conditions(), join.right_conditions_size()); + add_multi_exprs(join.other_conditions(), join.other_conditions_size()); + add_multi_exprs(join.other_eq_conditions_from_in(), join.other_eq_conditions_from_in_size()); break; } case tipb::ExecType::TypeExchangeReceiver: /// need collation break; /// Do nothing case tipb::ExecType::TypeExchangeSender: /// need collation { - tipb::ExchangeSender * exchange_sender = executor->mutable_exchange_sender(); - add_multi_exprs(exchange_sender->partition_keys(), exchange_sender->partition_keys_size()); - - /// Push child executors - if (exchange_sender->has_child()) - executors.push(exchange_sender->mutable_child()); + const tipb::ExchangeSender & exchange_sender = executor.exchange_sender(); + add_multi_exprs(exchange_sender.partition_keys(), exchange_sender.partition_keys_size()); break; } case tipb::ExecType::TypeSelection: { - tipb::Selection * selection = executor->mutable_selection(); - add_multi_exprs(selection->conditions(), selection->conditions_size()); - - if (selection->has_child()) - executors.push(selection->mutable_child()); + const tipb::Selection & selection = executor.selection(); + add_multi_exprs(selection.conditions(), selection.conditions_size()); + break; } case tipb::ExecType::TypeAggregation: { - tipb::Aggregation * aggregation = executor->mutable_aggregation(); - add_multi_exprs(aggregation->group_by(), aggregation->group_by_size()); - add_multi_exprs(aggregation->agg_func(), aggregation->agg_func_size()); - - if (aggregation->has_child()) - executors.push(aggregation->mutable_child()); + const tipb::Aggregation & aggregation = executor.aggregation(); + add_multi_exprs(aggregation.group_by(), aggregation.group_by_size()); + add_multi_exprs(aggregation.agg_func(), aggregation.agg_func_size()); break; } case tipb::ExecType::TypeTopN: - { - tipb::TopN * topn = executor->mutable_topn(); - - if (topn->has_child()) - executors.push(topn->mutable_child()); - break; - } + break; /// Do nothing case tipb::ExecType::TypeLimit: - { - tipb::Limit * limit = executor->mutable_limit(); - - if (limit->has_child()) - executors.push(limit->mutable_child()); - } + break; /// Do nothing case tipb::ExecType::TypeProjection: { - tipb::Projection * projection = executor->mutable_projection(); - add_multi_exprs(projection->exprs(), projection->exprs_size()); - - if (projection->has_child()) - executors.push(projection->mutable_child()); + const tipb::Projection & projection = executor.projection(); + add_multi_exprs(projection.exprs(), projection.exprs_size()); break; } case tipb::ExecType::TypeWindow: { - tipb::Window * window = executor->mutable_window(); - add_multi_exprs(window->func_desc(), window->func_desc_size()); - - if (window->has_child()) - executors.push(window->mutable_child()); + const tipb::Window & window = executor.window(); + add_multi_exprs(window.func_desc(), window.func_desc_size()); + break; } case tipb::ExecType::TypeTableScan: break; /// Do nothing @@ -357,7 +276,11 @@ void ExecutorCollation::checkScalarFunctionCollation(std::shared_ptr